Revert "SENTRY-2109: Fix the logic of identifying HMS out of Sync and handle gaps and out-of-sequence notifications.(Kalyan Kumar kalvagadda, reviewed-by Vadim Spector, Na Li and Arjun Mishra)" Reverting the patch as it needs some more reviewers to look into it.
This reverts commit afcaa4997afae428522e7f0d7fb5917e9e58580d. Project: http://git-wip-us.apache.org/repos/asf/sentry/repo Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/a178d7ed Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/a178d7ed Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/a178d7ed Branch: refs/heads/master Commit: a178d7ed86a48be937e780487333103870a70064 Parents: afcaa49 Author: Kalyan Kumar Kalvagadda <kkal...@cloudera.com> Authored: Fri Feb 2 15:13:24 2018 -0600 Committer: Kalyan Kumar Kalvagadda <kkal...@cloudera.com> Committed: Fri Feb 2 15:13:24 2018 -0600 ---------------------------------------------------------------------- .../exception/SentryOutOfSyncException.java | 26 --- .../db/service/persistent/HMSFollower.java | 104 ++++++--- .../db/service/persistent/SentryStore.java | 30 +-- .../service/thrift/HiveNotificationFetcher.java | 204 ++--------------- .../sentry/service/thrift/SentryService.java | 4 - .../sentry/service/thrift/ServiceConstants.java | 6 - .../db/service/persistent/TestHMSFollower.java | 86 +++---- .../TestHMSFollowerSentryStoreIntegration.java | 4 +- .../db/service/persistent/TestSentryStore.java | 32 +-- .../thrift/TestHiveNotificationFetcher.java | 222 +------------------ .../TestHiveNotificationFetcherCache.java | 203 ----------------- .../e2e/dbprovider/TestSnapshotCreation.java | 87 -------- ...tSnapshotCreationWithShorterHMSEventTtl.java | 105 --------- ...shotWithLongerHMSFollowerLongerInterval.java | 114 ---------- .../tests/e2e/hdfs/TestHDFSIntegrationBase.java | 43 +--- 15 files changed, 163 insertions(+), 1107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sentry/blob/a178d7ed/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryOutOfSyncException.java ---------------------------------------------------------------------- diff --git a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryOutOfSyncException.java b/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryOutOfSyncException.java deleted file mode 100644 index d2b1945..0000000 --- a/sentry-core/sentry-core-common/src/main/java/org/apache/sentry/core/common/exception/SentryOutOfSyncException.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.core.common.exception; - -public class SentryOutOfSyncException extends Exception { - private static final long serialVersionUID = 1L; - public SentryOutOfSyncException(String message) { - super(message); - } - -} http://git-wip-us.apache.org/repos/asf/sentry/blob/a178d7ed/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java index 45e4305..2f2b984 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/HMSFollower.java @@ -18,19 +18,18 @@ package org.apache.sentry.provider.db.service.persistent; +import org.apache.sentry.core.common.utils.PubSub; +import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; + import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME; import static org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME_DEPRECATED; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; - import java.util.Collection; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import javax.jdo.JDODataStoreException; - -import org.apache.sentry.core.common.exception.SentryOutOfSyncException; -import org.apache.sentry.core.common.utils.PubSub; -import org.apache.sentry.hdfs.ServiceConstants.ServerConfig; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.thrift.TException; @@ -110,7 +109,7 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { notificationProcessor = new NotificationProcessor(sentryStore, authServerName, authzConf); client = new SentryHMSClient(authzConf, hiveConnectionFactory); hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabledNoCache(authzConf); // no cache to test different settings for hdfs sync - notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory, authzConf); + notificationFetcher = new HiveNotificationFetcher(sentryStore, hiveConnectionFactory); // subscribe to full update notification if (conf.getBoolean(ServerConfig.SENTRY_SERVICE_FULL_UPDATE_PUBSUB, false)) { @@ -148,25 +147,25 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { @Override public void run() { SentryStateBank.enableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED); - long maxNotificationId; + long lastProcessedNotificationId; try { try { - // Initializing maxNotificationId based on the latest persisted notification ID. - maxNotificationId = sentryStore.getMaxNotificationID(); + // Initializing lastProcessedNotificationId based on the latest persisted notification ID. + lastProcessedNotificationId = sentryStore.getLastProcessedNotificationID(); } catch (Exception e) { LOGGER.error("Failed to get the last processed notification id from sentry store, " + "Skipping the processing", e); return; } // Wake any clients connected to this service waiting for HMS already processed notifications. - wakeUpWaitingClientsForSync(maxNotificationId); + wakeUpWaitingClientsForSync(lastProcessedNotificationId); // Only the leader should listen to HMS updates if (!isLeader()) { // Close any outstanding connections to HMS close(); return; } - syncupWithHms(maxNotificationId); + syncupWithHms(lastProcessedNotificationId); } finally { SentryStateBank.disableState(HMSFollowerState.COMPONENT,HMSFollowerState.STARTED); } @@ -190,9 +189,8 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { * * <p>Clients connections waiting for an event notification will be * woken up afterwards. - * @param maxNotificationId Max of all event-id's that sentry has processed. */ - private void syncupWithHms(long maxNotificationId) { + private void syncupWithHms(long notificationId) { try { client.connect(); connectedToHms = true; @@ -203,17 +201,18 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { } try { - Collection<NotificationEvent> notifications; /* Before getting notifications, it checks if a full HMS snapshot is required. */ - if (isFullSnapshotRequired(maxNotificationId)) { + if (isFullSnapshotRequired(notificationId)) { createFullSnapshot(); return; } - try { - notifications = notificationFetcher.fetchNotifications(maxNotificationId); - } catch (SentryOutOfSyncException e) { - LOGGER.error("An error occurred while fetching HMS notifications: {}", - e.getMessage()); + + Collection<NotificationEvent> notifications = + notificationFetcher.fetchNotifications(notificationId); + + // After getting notifications, it checks if the HMS did some clean-up and notifications + // are out-of-sync with Sentry. + if (areNotificationsOutOfSync(notifications, notificationId)) { createFullSnapshot(); return; } @@ -226,7 +225,7 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { } // Continue with processing new notifications if no snapshots are done. - processNotifications(notifications, maxNotificationId); + processNotifications(notifications); } catch (TException e) { LOGGER.error("An error occurred while fetching HMS notifications: ", e); close(); @@ -285,6 +284,52 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { } /** + * Checks if the HMS and Sentry processed notifications are out-of-sync. + * This could happen because the HMS did some clean-up of old notifications + * and Sentry was not requesting notifications during that time. + * + * @param events All new notifications to check for an out-of-sync. + * @param latestProcessedId The latest notification processed by Sentry to check against the + * list of notifications events. + * @return True if an out-of-sync is found; False otherwise. + */ + private boolean areNotificationsOutOfSync(Collection<NotificationEvent> events, + long latestProcessedId) { + if (events.isEmpty()) { + return false; + } + + /* + * If the sequence of notifications has a gap, then an out-of-sync might + * have happened due to the following issue: + * + * - HDFS sync was disabled or Sentry was shutdown for a time period longer than + * the HMS notification clean-up thread causing old notifications to be deleted. + * + * HMS notifications may contain both gaps in the sequence and duplicates + * (the same ID repeated more then once for different events). + * + * To accept duplicates (see NotificationFetcher for more info), then a gap is found + * if the 1st notification received is higher than the current ID processed + 1. + * i.e. + * 1st ID = 3, latest ID = 3 (duplicate found but no gap detected) + * 1st ID = 4, latest ID = 3 (consecutive ID found but no gap detected) + * 1st ID = 5, latest ID = 3 (a gap is detected) + */ + + List<NotificationEvent> eventList = (List<NotificationEvent>) events; + long firstNotificationId = eventList.get(0).getEventId(); + + if (firstNotificationId > (latestProcessedId + 1)) { + LOGGER.info("First HMS event notification Id = {} is greater than latest Sentry processed" + + "notification Id = {} + 1. Need to request a full HMS snapshot.", firstNotificationId, latestProcessedId); + return true; + } + + return false; + } + + /** * Request for full snapshot and persists it if there is no snapshot available in the sentry * store. Also, wakes-up any waiting clients. * @@ -347,27 +392,16 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { * Also, persists the notification ID regardless of processing result. * * @param events list of event to be processed - * @param notificationId Max event-id that sentry processed so far. * @throws Exception if the complete notification list is not processed because of JDO Exception */ - public void processNotifications(Collection<NotificationEvent> events, long notificationId) throws Exception { + public void processNotifications(Collection<NotificationEvent> events) throws Exception { boolean isNotificationProcessed; - long eventIdProcessed = notificationId; if (events.isEmpty()) { return; } for (NotificationEvent event : events) { isNotificationProcessed = false; - if (eventIdProcessed > 0) { - if (eventIdProcessed == event.getEventId()) { - LOGGER.info("Processing event with Duplicate event-id: {}", eventIdProcessed); - } else if (eventIdProcessed != event.getEventId() - 1) { - LOGGER.info("Events between ID's " + eventIdProcessed + " and " - + event.getEventId() + " are either missing OR out of order"); - } - } - eventIdProcessed = event.getEventId(); try { // Only the leader should process the notifications if (!isLeader()) { @@ -375,12 +409,11 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { return; } isNotificationProcessed = notificationProcessor.processNotificationEvent(event); - notificationFetcher.updateCache(event); } catch (Exception e) { if (e.getCause() instanceof JDODataStoreException) { LOGGER.info("Received JDO Storage Exception, Could be because of processing " + "duplicate notification"); - if (event.getEventId() <= sentryStore.getMaxNotificationID()) { + if (event.getEventId() <= sentryStore.getLastProcessedNotificationID()) { // Rest of the notifications need not be processed. LOGGER.error("Received event with Id: {} which is smaller then the ID " + "persisted in store", event.getEventId()); @@ -398,7 +431,6 @@ public class HMSFollower implements Runnable, AutoCloseable, PubSub.Subscriber { // Continue processing the next notification. LOGGER.debug("Explicitly Persisting Notification ID = {} ", event.getEventId()); sentryStore.persistLastProcessedNotificationID(event.getEventId()); - notificationFetcher.updateCache(event); } catch (Exception failure) { LOGGER.error("Received exception while persisting the notification ID = {}", event.getEventId()); throw failure; http://git-wip-us.apache.org/repos/asf/sentry/blob/a178d7ed/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java index 03a6f45..edea5b6 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java @@ -40,7 +40,6 @@ import javax.jdo.PersistenceManager; import javax.jdo.PersistenceManagerFactory; import javax.jdo.Query; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.sentry.core.common.exception.SentryAccessDeniedException; @@ -468,7 +467,7 @@ public class SentryStore { @Override public Long getValue() { try { - return getMaxNotificationID(); + return getLastProcessedNotificationID(); } catch (Exception e) { LOGGER.error("Can not read current notificationId", e); return NOTIFICATION_UNKNOWN; @@ -553,7 +552,7 @@ public class SentryStore { } @VisibleForTesting - public void clearAllTables() { + void clearAllTables() { try { tm.executeTransaction( new TransactionBlock<Object>() { @@ -2804,7 +2803,7 @@ public class SentryStore { * * @return the last persisted snapshot ID. It returns 0 if no rows are found. */ - public long getCurrentAuthzPathsSnapshotID() throws Exception { + private long getCurrentAuthzPathsSnapshotID() throws Exception { return tm.executeTransaction( new TransactionBlock<Long>() { @Override @@ -3887,7 +3886,7 @@ public class SentryStore { * @return the notification ID of latest path change. If no change * found then return 0. */ - public Long getMaxNotificationID() throws Exception { + public Long getLastProcessedNotificationID() throws Exception { long notificationId = tm.executeTransaction( new TransactionBlock<Long>() { public Long execute(PersistenceManager pm) throws Exception { @@ -4236,25 +4235,4 @@ public class SentryStore { } }); } - - /** - * Checks if notification with particular ID was already processed by searching - * for the ID on the MSentryHmsNotification table. - * - * @param id: event_id of the notification event. - * @return True if the notification was already processed; False otherwise - */ - public boolean isNotificationIdProcessed(long id) throws Exception { - return tm.executeTransactionWithRetry(new TransactionBlock<Boolean>() { - @Override - public Boolean execute(PersistenceManager pm) throws Exception { - pm.setDetachAllOnCommit(false); - Query query = pm.newQuery(MSentryHmsNotification.class); - query.setFilter("this.notificationId == id"); - query.declareParameters("long id"); - List<MSentryHmsNotification> ids = (List<MSentryHmsNotification>) query.execute(id); - return (CollectionUtils.isEmpty(ids)) ? false : true; - } - }); - } } http://git-wip-us.apache.org/repos/asf/sentry/blob/a178d7ed/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java index 30d6f50..93cc34f 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HiveNotificationFetcher.java @@ -18,18 +18,10 @@ package org.apache.sentry.service.thrift; -import static com.codahale.metrics.MetricRegistry.name; - -import com.codahale.metrics.Counter; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Strings; - import java.util.Collections; -import java.util.List; import java.util.HashSet; - -import org.apache.commons.collections.map.LRUMap; -import org.apache.hadoop.conf.Configuration; +import java.util.List; +import java.util.Set; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; @@ -37,8 +29,6 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.sentry.hdfs.UniquePathsUpdate; import org.apache.sentry.provider.db.service.persistent.SentryStore; -import org.apache.sentry.core.common.exception.SentryOutOfSyncException; -import org.apache.sentry.provider.db.service.thrift.SentryMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,54 +41,14 @@ public final class HiveNotificationFetcher implements AutoCloseable { private final SentryStore sentryStore; private final HiveConnectionFactory hmsConnectionFactory; private HiveMetaStoreClient hmsClient; - /* - This value helps HiveNotificationFetcher to decide the notification-id to use while fetching the notifications from - HMS to handle out-of-sequence notifications. - */ - private final int refetchCount; - - /* - In each iteration, HMSFollower will try to fetch notification which has the Max(event-id) - among the notifications it already fetched. This flag is used to check if that notification - is fetched in subsequent request. It's initialized to false in every iteration if sentry has fetched notifications - in any one the previous fetches. - */ - private boolean foundLastProcessedNotification = false; - /* - Idea of this cache is to store the notification event id and hash to avoid database lookup as there will be a lot - of notifications that are fetched again and again to handle out-of-sync notifications. Least recently used entries - in this cache are evicted automatically once the map is full. - */ - private final LRUMap cache; - - /* - Cache is designed to hold all the events with event-id's in the range of MAX(event-id) (that sentry processed) and - MAX(event-id) - (notification fetch count) from the configuration. Theoretically, all of these event-id's could have - duplicates. Cache size should be sufficient to hold all of them. This factor is used to calculate the size of cache. - */ - public static final int CACHE_BUFFER_FACTOR = 10; - -/* - This value is used to build a cache. Cache anticipates below duplicates and pre-allocated memory. - */ - private static final int DUPLICATE_COUNT = 3; - // Counter for failed transactions - private static final Counter notificationCacheMissCount = - SentryMetrics.getInstance(). - getCounter(name(HiveNotificationFetcher.class, - "Cache Miss")); + /* The following cache and last filtered ID help us to avoid making less calls to the DB */ + private long lastIdFiltered = 0; + private Set<String> cache = new HashSet<>(); - public HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory hmsConnectionFactory, - Configuration conf) { - refetchCount = conf.getInt(ServiceConstants.ServerConfig.SENTRY_HMS_NOTIFICATION_REFETCH_COUNT, - ServiceConstants.ServerConfig.SENTRY_HMS_NOTIFICATION_REFETCH_COUNT_DEFAULT); + public HiveNotificationFetcher(SentryStore sentryStore, HiveConnectionFactory hmsConnectionFactory) { this.sentryStore = sentryStore; this.hmsConnectionFactory = hmsConnectionFactory; - /* - Size of map is dependent on SENTRY_HMS_NOTIFICATION_REFETCH_COUNT and some additional buffer. - */ - this.cache = new LRUMap(refetchCount + (refetchCount / CACHE_BUFFER_FACTOR)); } /** @@ -114,82 +64,12 @@ public final class HiveNotificationFetcher implements AutoCloseable { } /** - * Update cache with notification hash and event-id - * @param event Notification event - */ - public void updateCache(NotificationEvent event) { - HashSet<String> eventSet; - String hash = UniquePathsUpdate.sha1(event); - - if(Strings.isNullOrEmpty(hash)) { - LOGGER.error("Hash provided is either null/empty, Cache not updated"); - } - eventSet = (HashSet<String>)cache.get(event.getEventId()); - if(eventSet != null) { - eventSet.add(hash); - } else { - eventSet = new HashSet<>(DUPLICATE_COUNT, 1); - eventSet.add(hash); - cache.put(event.getEventId(), eventSet); - } - } - - /** - * Find if the Notification is found in Cache. - * @param eventId notification event id - * @param hash notification hash - * @return True, if the hash is found in Cache - * False, otherwise - */ - @VisibleForTesting - boolean isFoundInCache(long eventId, String hash) { - HashSet<String> eventSet; - eventSet = (HashSet<String>)cache.get(eventId); - - if(eventSet == null) { - return false; - } else { - return eventSet.contains(hash); - } - } - - /** - * Find if the Notification event-id is found in Cache. - * @param eventId notification event id. - * @return True, if the event-id is found in Cache - * False, otherwise - */ - boolean isFoundInCache(long eventId) { - return cache.get(eventId) != null; - } - - /** - * Get the Cache size - * @return size of the Cache - */ - @VisibleForTesting - int getCacheSize() { - return cache.size(); - } - - /** * Fetch new HMS notifications appeared since a specified event ID. The returned list may * include notifications with the same specified ID if they were not seen by Sentry. - * In each iteration HiveNotificationFetcher will get the Max(event-id) that sentry has processed and tries to go back - * by configured number of event-ids and re-fetches them. Idea is that Max(event-id) should be fetched in subsequent - * fetch. If not, sentry should consider create full snapshot.This could happen for several scenario's. - * <ul> - * <li>HMS is been restored from a snapshot taken in the past</li> - * <li>Sentry did not fetch notifications from for a while. One use case is HDFS Sync being disabled</li> - * <li>HMS is intentionally reset</li> - * <li>NOTIFICATION_LOG table is cleared</li> - * </ul> * * @param lastEventId The event ID to use to request notifications. * @param maxEvents The maximum number of events to fetch. * @return A list of newer notifications unseen by Sentry. - * @throws SentryOutOfSyncException If event with event-id equals to Max(event-id) processed by sentry is not received - * in HMS response. * @throws Exception If an error occurs on the HMS communication. */ List<NotificationEvent> fetchNotifications(long lastEventId, int maxEvents) throws Exception { @@ -204,41 +84,16 @@ public final class HiveNotificationFetcher implements AutoCloseable { * * TODO: We can avoid doing this once HIVE-16886 is fixed. */ - - // Value of lastEventId has to be retained for logging purposes, making a copy. - long minFetchId = lastEventId; - if (minFetchId > 0) { - filter = createNotificationFilterFor(minFetchId); - minFetchId = (minFetchId > refetchCount) ? (minFetchId - refetchCount) : 0; - foundLastProcessedNotification = false; - } else { - foundLastProcessedNotification = true; + if (lastEventId > 0) { + filter = createNotificationFilterFor(lastEventId); + lastEventId--; } - LOGGER.debug("Requesting HMS notifications since ID = {} Max(Event-id) processed: {}", minFetchId, lastEventId); + LOGGER.debug("Requesting HMS notifications since ID = {}", lastEventId); NotificationEventResponse response; try { - /** - * Logic below triggers full-snapshots in below situation. - * 1. When HMS response with notification events does not have the event with Max(Event-id) processed by sentry - * and the event-id of the first event in the response is not equal to Max(Event-id)+1 processed by sentry - * Logic below will not trigger full-snapshots in below situations. - * 1. When the response doesn't have ay notification events. This will be seen in situations where there no changes - * to HMS data for a white and all the notification events are evicted from the NOTIFICATION_LOG table because of - * TTL expiration. - * 2. When HMS response with notification events does have the event with Max(Event-id) processed by sentry. - */ - response = getHmsClient().getNextNotification(minFetchId, maxEvents, filter); - if ((response != null) && (response.getEventsSize() > 0) && !foundLastProcessedNotification && - (response.getEvents().get(0).getEventId() != (lastEventId + 1))) { - LOGGER.error("Max event-id processed by sentry is " + lastEventId + " but the " + - "Id of the first event received from HMS in subsequent fetch is " + - response.getEvents().get(0).getEventId() + ", Requesting for Full snapshot"); - //Full snapshot should be requested. - throw new SentryOutOfSyncException("Notification Log doesn't have the " + - "last notification processed by sentry"); - } + response = getHmsClient().getNextNotification(lastEventId, maxEvents, filter); } catch (Exception e) { close(); throw e; @@ -266,31 +121,27 @@ public final class HiveNotificationFetcher implements AutoCloseable { * specified ID. If a new filter ID is used, then we clean up the cache. */ + if (lastIdFiltered != id) { + lastIdFiltered = id; + cache.clear(); + } + return new NotificationFilter() { @Override public boolean accept(NotificationEvent notificationEvent) { - LOGGER.debug("Applying filter created for event-id {} on Event with ID:{}", id, notificationEvent.getEventId()); - if (notificationEvent.getEventId() <= id) { + if (notificationEvent.getEventId() == id) { String hash = UniquePathsUpdate.sha1(notificationEvent); + try { - // This check makes sure that the last notification processed by - // HMSFollower is present in NOTIFICATION_LOG table. If it is not found - // it can be assumed that there were events that were cleaned up before - // Sentry could fetch them. When this happens sentry should take full snapshot again. - if ((notificationEvent.getEventId() == id) && !foundLastProcessedNotification) { - foundLastProcessedNotification = true; - } - if (isFoundInCache(notificationEvent.getEventId(), hash) == true) { - LOGGER.debug("Ignoring HMS notification already processed: ID = {}", notificationEvent.getEventId()); - return false; - } else if (sentryStore.isNotificationProcessed(hash)) { - notificationCacheMissCount.inc(); - LOGGER.debug("Ignoring HMS notification already processed: ID = {} - cache miss", notificationEvent.getEventId()); + if (cache.contains(hash) || sentryStore.isNotificationProcessed(hash)) { + cache.add(hash); + + LOGGER.debug("Ignoring HMS notification already processed: ID = {}", id); return false; } } catch (Exception e) { LOGGER.error("An error occurred while checking if notification {} is already " - + "processed: {}", notificationEvent.getEventId(), e); + + "processed: {}", id, e.getMessage()); // We cannot throw an exception on this filter, so we return false assuming this // notification is already processed @@ -352,16 +203,9 @@ public final class HiveNotificationFetcher implements AutoCloseable { hmsClient.close(); } + cache.clear(); } finally { hmsClient = null; } } - - /** - * Gets notification cache miss count. - * @return notification cache miss count. - */ - public long getNotificationCacheMissCount() { - return notificationCacheMissCount.getCount(); - } } http://git-wip-us.apache.org/repos/asf/sentry/blob/a178d7ed/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java index 827b078..96c6810 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/SentryService.java @@ -642,8 +642,4 @@ public class SentryService implements Callable, SigUtils.SigListener { // Become follower leaderMonitor.deactivate(); } - @VisibleForTesting - public long getCurrentAuthzPathsSnapshotID() throws Exception { - return sentryStore.getCurrentAuthzPathsSnapshotID(); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/sentry/blob/a178d7ed/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java index d16d2fd..7e02874 100644 --- a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java +++ b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/ServiceConstants.java @@ -241,12 +241,6 @@ public class ServiceConstants { public static final int SENTRY_DELTA_KEEP_COUNT_DEFAULT = 200; /** - * Number of notifications that HMSFollower should re-fetch in periodic pull from HMS. - */ - public static final String SENTRY_HMS_NOTIFICATION_REFETCH_COUNT = "sentry_hms_notification_refetch_count"; - public static final int SENTRY_HMS_NOTIFICATION_REFETCH_COUNT_DEFAULT = 100; - - /** * Number of notification id's to keep around during cleaning */ public static final String SENTRY_HMS_NOTIFICATION_ID_KEEP_COUNT = "sentry.server.delta.keep.count"; http://git-wip-us.apache.org/repos/asf/sentry/blob/a178d7ed/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java index 22ec61b..7903078 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollower.java @@ -64,8 +64,6 @@ import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import javax.security.auth.login.LoginException; @@ -130,7 +128,7 @@ public class TestHMSFollower { hmsFollower.setSentryHmsClient(sentryHmsClient); // 1st run should get a full snapshot because AuthzPathsMapping is empty - when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true); when(sentryStore.isHmsNotificationEmpty()).thenReturn(true); hmsFollower.run(); @@ -142,7 +140,7 @@ public class TestHMSFollower { reset(sentryStore); // 2nd run should not get a snapshot because is already processed - when(sentryStore.getMaxNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); hmsFollower.run(); verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); @@ -200,7 +198,7 @@ public class TestHMSFollower { hmsFollower.setSentryHmsClient(sentryHmsClient); // 1st run should get a full snapshot because AuthzPathsMapping is empty - when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(true); when(sentryStore.isHmsNotificationEmpty()).thenReturn(true); hmsFollower.run(); @@ -212,7 +210,7 @@ public class TestHMSFollower { reset(sentryStore); // 2nd run should not get a snapshot because is already processed - when(sentryStore.getMaxNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); hmsFollower.run(); verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); @@ -224,7 +222,7 @@ public class TestHMSFollower { // but because of full update trigger it will, as in the first run PubSub.getInstance().publish(PubSub.Topic.HDFS_SYNC_HMS, "message"); - when(sentryStore.getMaxNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); hmsFollower.run(); verify(sentryStore, times(1)).persistFullPathsImage( @@ -235,7 +233,7 @@ public class TestHMSFollower { // 4th run should not get a snapshot because is already processed and publish-subscribe // trigger is only supposed to work once. This is exactly as 2nd run. - when(sentryStore.getMaxNotificationID()).thenReturn(fullSnapshot.getId()); + when(sentryStore.getLastProcessedNotificationID()).thenReturn(fullSnapshot.getId()); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); hmsFollower.run(); verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); @@ -286,7 +284,7 @@ public class TestHMSFollower { // 1st run should get a full snapshot because hms notificaions is empty // but it should never be persisted because HDFS sync is disabled - when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); when(sentryStore.isHmsNotificationEmpty()).thenReturn(true); hmsFollower.run(); verify(sentryStore, times(0)).persistFullPathsImage( @@ -311,7 +309,7 @@ public class TestHMSFollower { //Set last processed notification Id to match the full new value 1L final long LATEST_EVENT_ID = 1L; - when(sentryStore.getMaxNotificationID()).thenReturn(LATEST_EVENT_ID); + when(sentryStore.getLastProcessedNotificationID()).thenReturn(LATEST_EVENT_ID); //Mock that sets isHmsNotificationEmpty to false when(sentryStore.isHmsNotificationEmpty()).thenReturn(false); // Mock that sets the current HMS notification ID. Set it to match @@ -371,7 +369,7 @@ public class TestHMSFollower { hmsFollower.setSentryHmsClient(sentryHmsClient); // 1st run should get a full snapshot - when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); hmsFollower.run(); verify(sentryStore, times(1)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); @@ -380,7 +378,7 @@ public class TestHMSFollower { reset(sentryStore); // 2nd run should not get a snapshot because is already processed - when(sentryStore.getMaxNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID); + when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); hmsFollower.run(); verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); @@ -418,33 +416,20 @@ public class TestHMSFollower { SentryHMSClient sentryHmsClient = Mockito.mock(SentryHMSClient.class); when(sentryHmsClient.getFullSnapshot()).thenReturn(fullSnapshot); - when(hmsClientMock.getNextNotification(Mockito.anyLong(), Mockito.eq(Integer.MAX_VALUE), - (NotificationFilter) Mockito.notNull())).thenAnswer(new Answer<NotificationEventResponse>() { - @Override - public NotificationEventResponse answer(InvocationOnMock invocation) - throws Throwable { - NotificationFilter filter = (NotificationFilter) invocation.getArguments()[2]; - NotificationEventResponse response = new NotificationEventResponse(); - - List<NotificationEvent> events = Arrays.<NotificationEvent>asList( + when(hmsClientMock.getNextNotification(Mockito.eq(SENTRY_PROCESSED_EVENT_ID - 1), Mockito.eq(Integer.MAX_VALUE), + (NotificationFilter) Mockito.notNull())) + .thenReturn(new NotificationEventResponse( + Arrays.<NotificationEvent>asList( new NotificationEvent(fullSnapshot.getId(), 0, "", "") - ); - - for (NotificationEvent event : events) { - if (filter.accept(event)) { - response.addToEvents(event); - } - } + ) + )); - return response; - } - }); HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hmsConnectionMock, hiveInstance); hmsFollower.setSentryHmsClient(sentryHmsClient); // 1st run should get a full snapshot - when(sentryStore.getMaxNotificationID()) + when(sentryStore.getLastProcessedNotificationID()) .thenReturn(SENTRY_PROCESSED_EVENT_ID); when(sentryStore.isHmsNotificationEmpty()).thenReturn(false); hmsFollower.run(); @@ -454,11 +439,8 @@ public class TestHMSFollower { reset(sentryStore); // 2nd run should not get a snapshot because is already processed - when(sentryStore.getMaxNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID); + when(sentryStore.getLastProcessedNotificationID()).thenReturn(HMS_PROCESSED_EVENT_ID); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); - when(sentryStore.isNotificationIdProcessed(fullSnapshot.getId())).thenReturn(true); - when(sentryStore.isNotificationProcessed(UniquePathsUpdate.sha1(new NotificationEvent( - fullSnapshot.getId(), 0, "", "")))).thenReturn(true); hmsFollower.run(); verify(sentryStore, times(0)).persistFullPathsImage(Mockito.anyMap(), Mockito.anyLong()); verify(sentryStore, times(0)).persistLastProcessedNotificationID(Mockito.anyLong()); @@ -543,7 +525,7 @@ public class TestHMSFollower { events.add(notificationEvent); HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hiveConnectionFactory, hiveInstance); - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); @@ -573,7 +555,7 @@ public class TestHMSFollower { HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hiveConnectionFactory, hiveInstance); - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); @@ -607,7 +589,7 @@ public class TestHMSFollower { HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hiveConnectionFactory, hiveInstance); - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); @@ -642,7 +624,7 @@ public class TestHMSFollower { HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hiveConnectionFactory, hiveInstance); - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); @@ -683,7 +665,7 @@ public class TestHMSFollower { HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hiveConnectionFactory, hiveInstance); - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); @@ -742,7 +724,7 @@ public class TestHMSFollower { events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification // and persistLastProcessedNotificationID was not invoked. //noinspection unchecked @@ -766,7 +748,7 @@ public class TestHMSFollower { events.add(notificationEvent); inputEventId += 1; //Process the notification - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); // Make sure that addAuthzPathsMapping was invoked once to handle ADD_PARTITION notification // and persistLastProcessedNotificationID was not invoked. //noinspection unchecked @@ -786,7 +768,7 @@ public class TestHMSFollower { events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); // Make sure that persistLastProcessedNotificationID is invoked explicitly. verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1); reset(sentryStore); @@ -804,7 +786,7 @@ public class TestHMSFollower { events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); // Make sure that updateAuthzPathsMapping was invoked once to handle ALTER_PARTITION // notification and persistLastProcessedNotificationID was not invoked. verify(sentryStore, times(1)).updateAuthzPathsMapping(Mockito.anyString(), @@ -827,7 +809,7 @@ public class TestHMSFollower { notificationEvent.setTableName(tableName2); events.add(notificationEvent); // Process the notification - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification // and persistLastProcessedNotificationID was not invoked. //noinspection unchecked @@ -882,7 +864,7 @@ public class TestHMSFollower { events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification // and persistLastProcessedNotificationID was not invoked. //noinspection unchecked @@ -907,7 +889,7 @@ public class TestHMSFollower { events.add(notificationEvent); inputEventId += 1; // Process the notification - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); // Make sure that renameAuthzObj and deleteAuthzPathsMapping were not invoked // to handle CREATE_TABLE notification // and persistLastProcessedNotificationID is explicitly invoked @@ -934,7 +916,7 @@ public class TestHMSFollower { notificationEvent.setTableName(tableName2); events.add(notificationEvent); // Process the notification - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); // Make sure that addAuthzPathsMapping was invoked once to handle CREATE_TABLE notification // and persistLastProcessedNotificationID was not invoked. //noinspection unchecked @@ -988,7 +970,7 @@ public class TestHMSFollower { Configuration configuration = new Configuration(); HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hiveConnectionFactory, hiveInstance); - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); // invalid event updates notification ID directly verify(sentryStore, times(1)).persistLastProcessedNotificationID(inputEventId - 1); @@ -1030,7 +1012,7 @@ public class TestHMSFollower { hmsFollower.setSentryHmsClient(sentryHmsClient); // 1st run should get a full snapshot because AuthzPathsMapping is empty - when(sentryStore.getMaxNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); + when(sentryStore.getLastProcessedNotificationID()).thenReturn(SENTRY_PROCESSED_EVENT_ID); when(sentryStore.isAuthzPathsMappingEmpty()).thenReturn(false); when(sentryStore.isHmsNotificationEmpty()).thenReturn(true); hmsFollower.run(); @@ -1064,7 +1046,7 @@ public class TestHMSFollower { Configuration configuration = new Configuration(); HMSFollower hmsFollower = new HMSFollower(configuration, sentryStore, null, hiveConnectionFactory, hiveInstance); - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance); authorizable.setServer(hiveInstance); http://git-wip-us.apache.org/repos/asf/sentry/blob/a178d7ed/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java index 996f554..91c90f9 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestHMSFollowerSentryStoreIntegration.java @@ -190,7 +190,7 @@ public class TestHMSFollowerSentryStoreIntegration { List<NotificationEvent> events = new ArrayList<>(); events.add(notificationEvent); - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); Assert.assertEquals(1, sentryStore.getAllTSentryPrivilegesByRoleName(roleName1) .size()); @@ -248,7 +248,7 @@ public class TestHMSFollowerSentryStoreIntegration { List<NotificationEvent> events = new ArrayList<>(); events.add(notificationEvent); - hmsFollower.processNotifications(events, 0); + hmsFollower.processNotifications(events); Assert.assertEquals(1, sentryStore.getAllTSentryPrivilegesByRoleName(roleName1) .size()); http://git-wip-us.apache.org/repos/asf/sentry/blob/a178d7ed/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java index 9a03f48..b410027 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/provider/db/service/persistent/TestSentryStore.java @@ -2466,7 +2466,7 @@ public class TestSentryStore extends org.junit.Assert { long notificationID = 11; sentryStore.persistFullPathsImage(authzPaths, notificationID); PathsUpdate pathsUpdate = sentryStore.retrieveFullPathsImageUpdate(prefixes); - long savedNotificationID = sentryStore.getMaxNotificationID(); + long savedNotificationID = sentryStore.getLastProcessedNotificationID(); assertEquals(1, pathsUpdate.getImgNum()); TPathsDump pathDump = pathsUpdate.toThrift().getPathsDump(); Map<Integer, TPathEntry> nodeMap = pathDump.getNodeMap(); @@ -2519,7 +2519,7 @@ public class TestSentryStore extends org.junit.Assert { sentryStore.addAuthzPathsMapping("db2", Arrays.asList("/hive/db2"), update2); // Check the latest persisted ID matches to both the path updates - long latestID = sentryStore.getMaxNotificationID(); + long latestID = sentryStore.getLastProcessedNotificationID(); assertEquals(notificationID, latestID); String []prefixes = {"/hive"}; @@ -2585,7 +2585,7 @@ public class TestSentryStore extends org.junit.Assert { sentryStore.persistLastProcessedNotificationID(notificationID); // Retrieving latest peristed ID should match with the previous persisted ID - long latestID = sentryStore.getMaxNotificationID(); + long latestID = sentryStore.getLastProcessedNotificationID(); assertEquals(notificationID, latestID); } @@ -2597,7 +2597,7 @@ public class TestSentryStore extends org.junit.Assert { sentryStore.persistFullPathsImage(new HashMap<String, Collection<String>>(), notificationID); // Add "db1.table1" authzObj - Long lastNotificationId = sentryStore.getMaxNotificationID(); + Long lastNotificationId = sentryStore.getLastProcessedNotificationID(); UniquePathsUpdate addUpdate = new UniquePathsUpdate("u1", 1, false); addUpdate.newPathChange("db1.table"). addToAddPaths(Arrays.asList("db1", "tbl1")); @@ -2624,7 +2624,7 @@ public class TestSentryStore extends org.junit.Assert { long lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange addPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(addUpdate.JSONSerialize(), addPathChange.getPathChange()); - lastNotificationId = sentryStore.getMaxNotificationID(); + lastNotificationId = sentryStore.getLastProcessedNotificationID(); assertEquals(1, lastNotificationId.longValue()); // Delete path 'db1.db/tbl1' from "db1.table1" authzObj. @@ -2649,7 +2649,7 @@ public class TestSentryStore extends org.junit.Assert { lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange delPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(delUpdate.JSONSerialize(), delPathChange.getPathChange()); - lastNotificationId = sentryStore.getMaxNotificationID(); + lastNotificationId = sentryStore.getLastProcessedNotificationID(); assertEquals(2, lastNotificationId.longValue()); // Delete "db1.table" authzObj from the authzObj -> [Paths] mapping. @@ -2674,7 +2674,7 @@ public class TestSentryStore extends org.junit.Assert { MSentryPathChange delAllPathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(delAllupdate.JSONSerialize(), delAllPathChange.getPathChange()); - lastNotificationId = sentryStore.getMaxNotificationID(); + lastNotificationId = sentryStore.getLastProcessedNotificationID(); assertEquals(3, lastNotificationId.longValue()); } @@ -2682,7 +2682,7 @@ public class TestSentryStore extends org.junit.Assert { @Test public void testRenameUpdateAuthzPathsMapping() throws Exception { Map<String, Collection<String>> authzPaths = new HashMap<>(); - Long lastNotificationId = sentryStore.getMaxNotificationID(); + Long lastNotificationId = sentryStore.getLastProcessedNotificationID(); authzPaths.put("db1.table1", Sets.newHashSet("user/hive/warehouse/db1.db/table1", "user/hive/warehouse/db1.db/table1/p1")); authzPaths.put("db1.table2", Sets.newHashSet("user/hive/warehouse/db1.db/table2")); @@ -2728,7 +2728,7 @@ public class TestSentryStore extends org.junit.Assert { long lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange renamePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(renameUpdate.JSONSerialize(), renamePathChange.getPathChange()); - lastNotificationId = sentryStore.getMaxNotificationID(); + lastNotificationId = sentryStore.getLastProcessedNotificationID(); assertEquals(1, lastNotificationId.longValue()); // Rename 'db1.table1' to "db1.table2" but did not change its location. renameUpdate = new UniquePathsUpdate("u2",2, false); @@ -2752,7 +2752,7 @@ public class TestSentryStore extends org.junit.Assert { assertTrue(CollectionUtils.isEqualCollection(Lists.newArrayList("user/hive/warehouse/db1.db/table1/p1", "user/hive/warehouse/db1.db/newTable1"), pathsImage.get("db1.newTable2"))); - lastNotificationId = sentryStore.getMaxNotificationID(); + lastNotificationId = sentryStore.getLastProcessedNotificationID(); assertEquals(2, lastNotificationId.longValue()); // Query the persisted path change and ensure it equals to the original one @@ -2790,7 +2790,7 @@ public class TestSentryStore extends org.junit.Assert { lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange updatePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(update.JSONSerialize(), updatePathChange.getPathChange()); - lastNotificationId = sentryStore.getMaxNotificationID(); + lastNotificationId = sentryStore.getLastProcessedNotificationID(); assertEquals(3, lastNotificationId.longValue()); } @@ -2914,7 +2914,7 @@ public class TestSentryStore extends org.junit.Assert { assertEquals(1, pathImage.get("db2.table").size()); assertEquals(3, sentryStore.getMPaths().size()); - Long lastNotificationId = sentryStore.getMaxNotificationID(); + Long lastNotificationId = sentryStore.getLastProcessedNotificationID(); assertEquals(notificationID, lastNotificationId.longValue()); } @@ -3419,7 +3419,7 @@ public class TestSentryStore extends org.junit.Assert { @Test public void testDuplicateNotification() throws Exception { Map<String, Collection<String>> authzPaths = new HashMap<>(); - Long lastNotificationId = sentryStore.getMaxNotificationID(); + Long lastNotificationId = sentryStore.getLastProcessedNotificationID(); lastNotificationId ++; authzPaths.put("db1.table1", Sets.newHashSet("user/hive/warehouse/db1.db/table1", @@ -3471,7 +3471,7 @@ public class TestSentryStore extends org.junit.Assert { long lastChangeID = sentryStore.getLastProcessedPathChangeID(); MSentryPathChange renamePathChange = sentryStore.getMSentryPathChangeByID(lastChangeID); assertEquals(renameUpdate.JSONSerialize(), renamePathChange.getPathChange()); - Long savedLastNotificationId = sentryStore.getMaxNotificationID(); + Long savedLastNotificationId = sentryStore.getLastProcessedNotificationID(); assertEquals(lastNotificationId.longValue(), savedLastNotificationId.longValue()); @@ -3531,7 +3531,7 @@ public class TestSentryStore extends org.junit.Assert { localSentryStore.persistFullPathsImage(new HashMap<String, Collection<String>>(), 0); // Add "db1.table1" authzObj - Long lastNotificationId = sentryStore.getMaxNotificationID(); + Long lastNotificationId = sentryStore.getLastProcessedNotificationID(); UniquePathsUpdate addUpdate = new UniquePathsUpdate("u1",1, false); addUpdate.newPathChange("db1.table"). addToAddPaths(Arrays.asList("db1", "tbl1")); @@ -3601,7 +3601,7 @@ public class TestSentryStore extends org.junit.Assert { lastChangeID = localSentryStore.getLastProcessedPathChangeID(); assertEquals(0, lastChangeID); - lastNotificationId = localSentryStore.getMaxNotificationID(); + lastNotificationId = localSentryStore.getLastProcessedNotificationID(); assertEquals(0, lastNotificationId.longValue()); // enable HDFS for other tests http://git-wip-us.apache.org/repos/asf/sentry/blob/a178d7ed/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java index 3a74b70..83a1bec 100644 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java +++ b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcher.java @@ -18,20 +18,17 @@ package org.apache.sentry.service.thrift; -import static org.junit.Assert.*; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.Arrays; import java.util.Collections; import java.util.List; - -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient.NotificationFilter; import org.apache.hadoop.hive.metastore.api.NotificationEvent; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; +import org.apache.sentry.hdfs.UniquePathsUpdate; import org.apache.sentry.provider.db.service.persistent.SentryStore; import org.junit.Test; import org.mockito.Mockito; @@ -47,7 +44,7 @@ public class TestHiveNotificationFetcher { Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); - try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) { + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) { List<NotificationEvent> events; Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null)) @@ -66,7 +63,7 @@ public class TestHiveNotificationFetcher { Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); - try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) { + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) { List<NotificationEvent> events; Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null)) @@ -85,7 +82,7 @@ public class TestHiveNotificationFetcher { Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); - try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) { + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) { List<NotificationEvent> events; Mockito.when(hmsClient.getNextNotification(0, Integer.MAX_VALUE, null)) @@ -113,11 +110,9 @@ public class TestHiveNotificationFetcher { Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); - try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) { + try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection)) { List<NotificationEvent> events; - // Updating the Cache in the notification - fetcher.updateCache(new NotificationEvent(1L, 0, "CREATE_DATABASE", "")); /* * Requesting an ID > 0 will request all notifications from 0 again but filter those * already seen notifications with ID = 1 @@ -139,9 +134,13 @@ public class TestHiveNotificationFetcher { ); for (NotificationEvent event : events) { + String hash = UniquePathsUpdate.sha1(event); + // We simulate that CREATE_DATABASE is already processed if (event.getEventType().equals("CREATE_DATABASE")) { - Mockito.when(store.isNotificationIdProcessed(1)).thenReturn(true); + Mockito.when(store.isNotificationProcessed(Mockito.eq(hash))).thenReturn(true); + } else { + Mockito.when(store.isNotificationProcessed(Mockito.eq(hash))).thenReturn(false); } if (filter.accept(event)) { @@ -154,7 +153,6 @@ public class TestHiveNotificationFetcher { }); events = fetcher.fetchNotifications(1); - verify(store, times(1)).isNotificationProcessed(Mockito.anyString()); assertEquals(2, events.size()); assertEquals(1, events.get(0).getEventId()); assertEquals("CREATE_TABLE", events.get(0).getEventType()); @@ -162,200 +160,4 @@ public class TestHiveNotificationFetcher { assertEquals("ALTER_TABLE", events.get(1).getEventType()); } } - - /** - * Test verifies that any out-of-sync notifications which below the SENTRY_HMS_NOTIFICATION_REFETCH_COUNT - * threshold will be fetched in the subsequent fetches. - * @throws Exception - */ - @Test - public void testOutofSyncNotifications() throws Exception { - SentryStore store = Mockito.mock(SentryStore.class); - HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class); - HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); - - Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); - - try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) { - List<NotificationEvent> events; - - // This mock will also test that the NotificationFilter works as expected - Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE), - (NotificationFilter) Mockito.isNull())).thenAnswer(new Answer<NotificationEventResponse>() { - @Override - public NotificationEventResponse answer(InvocationOnMock invocation) - throws Throwable { - List<NotificationEvent> events = Arrays.<NotificationEvent>asList( - new NotificationEvent(1L, 0, "CREATE_DATABASE", ""), - new NotificationEvent(2L, 0, "CREATE_TABLE", ""), - new NotificationEvent(3L, 0, "CREATE_TABLE", ""), - new NotificationEvent(7L, 0, "CREATE_TABLE", ""), - new NotificationEvent(8L, 0, "CREATE_TABLE", ""), - new NotificationEvent(9L, 0, "CREATE_TABLE", "") - ); - NotificationEventResponse response = new NotificationEventResponse(events); - for (NotificationEvent event : events) { - fetcher.updateCache(event); - } - return response; - } - }); - - events = fetcher.fetchNotifications(0); - assertEquals(6, events.size()); - assertEquals(1, events.get(0).getEventId()); - assertEquals("CREATE_DATABASE", events.get(0).getEventType()); - assertEquals(2, events.get(1).getEventId()); - assertEquals("CREATE_TABLE", events.get(1).getEventType()); - verify(hmsClient, times(1)).getNextNotification(Mockito.eq(0L), Mockito.anyInt(), Mockito.anyObject()); - - reset(hmsClient); - // This mock will also test that the NotificationFilter works as expected - Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE), - (NotificationFilter) Mockito.notNull())).thenAnswer(new Answer<NotificationEventResponse>() { - @Override - public NotificationEventResponse answer(InvocationOnMock invocation) - throws Throwable { - NotificationFilter filter = (NotificationFilter) invocation.getArguments()[2]; - NotificationEventResponse response = new NotificationEventResponse(); - - List<NotificationEvent> events = Arrays.<NotificationEvent>asList( - new NotificationEvent(1L, 0, "CREATE_DATABASE", ""), - new NotificationEvent(2L, 0, "CREATE_TABLE", ""), - new NotificationEvent(3L, 0, "CREATE_TABLE", ""), - new NotificationEvent(4L, 0, "CREATE_TABLE", ""), - new NotificationEvent(5L, 0, "CREATE_TABLE", ""), - new NotificationEvent(6L, 0, "CREATE_TABLE", ""), - new NotificationEvent(7L, 0, "CREATE_TABLE", ""), - new NotificationEvent(8L, 0, "CREATE_TABLE", ""), - new NotificationEvent(9L, 0, "CREATE_TABLE", "") - ); - - for (NotificationEvent event : events) { - // We simulate that CREATE_DATABASE is already processed - if (event.getEventId() == 9) { - Mockito.when(store.isNotificationIdProcessed(9)).thenReturn(true); - } - if (filter.accept(event)) { - response.addToEvents(event); - } - } - - return response; - } - }); - - events = fetcher.fetchNotifications(9); - assertEquals(3, events.size()); - assertEquals(4, events.get(0).getEventId()); - verify(store, times(3)).isNotificationProcessed(Mockito.anyString()); - verify(hmsClient, times(1)).getNextNotification(Mockito.eq(0L), Mockito.anyInt(), Mockito.anyObject()); - } - } - - /** - * Test verifies that any out-of-sync notifications which above the SENTRY_HMS_NOTIFICATION_REFETCH_COUNT - * threshold will be lost and will not be fetched. - * @throws Exception - */ - @Test - public void testMissingNotifications() throws Exception { - SentryStore store = Mockito.mock(SentryStore.class); - HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class); - HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); - - Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); - - Configuration conf = new Configuration(); - conf.set(ServiceConstants.ServerConfig.SENTRY_HMS_NOTIFICATION_REFETCH_COUNT, "3"); - try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, conf)) { - List<NotificationEvent> events; - - // This mock will also test that the NotificationFilter works as expected - Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE), - (NotificationFilter) Mockito.isNull())).thenAnswer(new Answer<NotificationEventResponse>() { - @Override - public NotificationEventResponse answer(InvocationOnMock invocation) - throws Throwable { - List<NotificationEvent> events = Arrays.<NotificationEvent>asList( - new NotificationEvent(1L, 0, "CREATE_DATABASE", ""), - new NotificationEvent(2L, 0, "CREATE_TABLE", ""), - new NotificationEvent(8L, 0, "CREATE_TABLE", ""), - new NotificationEvent(9L, 0, "CREATE_TABLE", "") - ); - NotificationEventResponse response = new NotificationEventResponse(events); - for (NotificationEvent event : events) { - fetcher.updateCache(event); - } - return response; - } - }); - - events = fetcher.fetchNotifications(0); - assertEquals(4, events.size()); - assertEquals(1, events.get(0).getEventId()); - assertEquals("CREATE_DATABASE", events.get(0).getEventType()); - assertEquals(2, events.get(1).getEventId()); - assertEquals("CREATE_TABLE", events.get(1).getEventType()); - verify(hmsClient, times(1)).getNextNotification(Mockito.eq(0L), Mockito.anyInt(), Mockito.anyObject()); - - reset(hmsClient); - // This mock will also test that the NotificationFilter works as expected - Mockito.when(hmsClient.getNextNotification(Mockito.eq(6L), Mockito.eq(Integer.MAX_VALUE), - (NotificationFilter) Mockito.notNull())).thenAnswer(new Answer<NotificationEventResponse>() { - @Override - public NotificationEventResponse answer(InvocationOnMock invocation) - throws Throwable { - NotificationFilter filter = (NotificationFilter) invocation.getArguments()[2]; - NotificationEventResponse response = new NotificationEventResponse(); - - List<NotificationEvent> events = Arrays.<NotificationEvent>asList( - new NotificationEvent(7L, 0, "CREATE_TABLE", ""), - new NotificationEvent(8L, 0, "CREATE_TABLE", ""), - new NotificationEvent(9L, 0, "CREATE_TABLE", "") - ); - - for (NotificationEvent event : events) { - // We simulate that CREATE_DATABASE is already processed - if (event.getEventId() == 9) { - Mockito.when(store.isNotificationIdProcessed(9)).thenReturn(true); - } - if (filter.accept(event)) { - response.addToEvents(event); - } - } - - return response; - } - }); - - events = fetcher.fetchNotifications(9); - assertEquals(1, events.size()); - assertEquals(7, events.get(0).getEventId()); - verify(store, times(1)).isNotificationProcessed(Mockito.anyString()); - verify(hmsClient, times(1)).getNextNotification(Mockito.eq(6L), Mockito.anyInt(), Mockito.anyObject()); - } - } - - @Test - public void verifyCache() throws Exception { - SentryStore store = Mockito.mock(SentryStore.class); - HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class); - HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); - - Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); - Configuration conf = new Configuration(); - // With this configuration, cache size should be 9. - conf.set(ServiceConstants.ServerConfig.SENTRY_HMS_NOTIFICATION_REFETCH_COUNT, "3"); - HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, conf); - - for (int i = 0; i < 15 ; i++) { - fetcher.updateCache(new NotificationEvent(i, 0, "CREATE_DATABASE", "")); - } - - assertEquals("Invalid Cache size", 3, fetcher.getCacheSize()); - for (int i = 0; i < (15 - (3 + (3/HiveNotificationFetcher.CACHE_BUFFER_FACTOR))) ; i++) { - assertFalse("Cache entry for " + i + " should have been evicted", fetcher.isFoundInCache(i)); - } - } } http://git-wip-us.apache.org/repos/asf/sentry/blob/a178d7ed/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcherCache.java ---------------------------------------------------------------------- diff --git a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcherCache.java b/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcherCache.java deleted file mode 100644 index a81fdf4..0000000 --- a/sentry-provider/sentry-provider-db/src/test/java/org/apache/sentry/service/thrift/TestHiveNotificationFetcherCache.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - <p> - http://www.apache.org/licenses/LICENSE-2.0 - <p> - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - */ - -package org.apache.sentry.service.thrift; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; - -import com.google.common.io.Files; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; -import org.apache.hadoop.security.alias.CredentialProvider; -import org.apache.hadoop.security.alias.CredentialProviderFactory; -import org.apache.hadoop.security.alias.UserProvider; -import org.apache.sentry.provider.db.service.persistent.SentryStore; -import org.junit.After; -import org.junit.BeforeClass; -import org.junit.Test; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.io.File; -import java.util.ArrayList; -import java.util.List; - -/** - * This class tests the notification metadata cache in HiveNotificationFetcher. - */ -public class TestHiveNotificationFetcherCache { - private static Configuration conf = null; - private static File dataDir; - private static File policyFilePath; - private static String[] adminGroups = {"adminGroup1"}; - private static char[] passwd = new char[]{'1', '2', '3'}; - private static SentryStore store; - private static HiveConnectionFactory hmsConnection = Mockito.mock(HiveConnectionFactory.class); - private static HiveMetaStoreClient hmsClient = Mockito.mock(HiveMetaStoreClient.class); - private static List<NotificationEvent> unFilteredEvents = new ArrayList<NotificationEvent>(); - - - @BeforeClass - public static void setup() throws Exception { - conf = new Configuration(false); - final String ourUrl = UserProvider.SCHEME_NAME + ":///"; - conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, ourUrl); - - // enable HDFS sync, so perm and path changes will be saved into DB - conf.set(ServiceConstants.ServerConfig.PROCESSOR_FACTORIES, "org.apache.sentry.hdfs.SentryHDFSServiceProcessorFactory"); - conf.set(ServiceConstants.ServerConfig.SENTRY_POLICY_STORE_PLUGINS, "org.apache.sentry.hdfs.SentryPlugin"); - - // THis should be a UserGroupInformation provider - CredentialProvider provider = CredentialProviderFactory.getProviders(conf).get(0); - - - // The user credentials are stored as a static variable by UserGrouoInformation provider. - // We need to only set the password the first time, an attempt to set it for the second - // time fails with an exception. - if (provider.getCredentialEntry(ServiceConstants.ServerConfig.SENTRY_STORE_JDBC_PASS) == null) { - provider.createCredentialEntry(ServiceConstants.ServerConfig.SENTRY_STORE_JDBC_PASS, passwd); - provider.flush(); - } - - dataDir = new File(Files.createTempDir(), "sentry_policy_db"); - conf.set(ServiceConstants.ServerConfig.SENTRY_VERIFY_SCHEM_VERSION, "false"); - conf.set(ServiceConstants.ServerConfig.SENTRY_STORE_JDBC_URL, - "jdbc:derby:;databaseName=" + dataDir.getPath() + ";create=true"); - conf.set(ServiceConstants.ServerConfig.SENTRY_STORE_JDBC_PASS, "dummy"); - conf.setStrings(ServiceConstants.ServerConfig.ADMIN_GROUPS, adminGroups); - conf.set(ServiceConstants.ServerConfig.SENTRY_STORE_GROUP_MAPPING, - ServiceConstants.ServerConfig.SENTRY_STORE_LOCAL_GROUP_MAPPING); - policyFilePath = new File(dataDir, "local_policy_file.ini"); - conf.set(ServiceConstants.ServerConfig.SENTRY_STORE_GROUP_MAPPING_RESOURCE, - policyFilePath.getPath()); - - // These tests do not need to retry transactions, so setting to 1 to reduce testing time - conf.setInt(ServiceConstants.ServerConfig.SENTRY_STORE_TRANSACTION_RETRY, 1); - - // SentryStore should be initialized only once. The tables created by the test cases will - // be cleaned up during the @After method. - store = new SentryStore(conf); - - boolean hdfsSyncEnabled = SentryServiceUtil.isHDFSSyncEnabled(conf); - store.setPersistUpdateDeltas(hdfsSyncEnabled); - Mockito.when(hmsConnection.connect()).thenReturn(new HMSClient(hmsClient)); - - Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE), - (IMetaStoreClient.NotificationFilter) Mockito.isNull())).thenAnswer(new Answer<NotificationEventResponse>() { - @Override - public NotificationEventResponse answer(InvocationOnMock invocation) - throws Throwable { - NotificationEventResponse response = new NotificationEventResponse(); - for (NotificationEvent event : unFilteredEvents) { - response.addToEvents(event); - } - - return response; - } - }); - - // This mock will also test that the NotificationFilter works as expected - Mockito.when(hmsClient.getNextNotification(Mockito.eq(0L), Mockito.eq(Integer.MAX_VALUE), - (IMetaStoreClient.NotificationFilter) Mockito.notNull())).thenAnswer(new Answer<NotificationEventResponse>() { - @Override - public NotificationEventResponse answer(InvocationOnMock invocation) - throws Throwable { - IMetaStoreClient.NotificationFilter filter = (IMetaStoreClient.NotificationFilter) invocation.getArguments()[2]; - NotificationEventResponse response = new NotificationEventResponse(); - for (NotificationEvent event : unFilteredEvents) { - if (filter.accept(event)) { - response.addToEvents(event); - } - } - - return response; - } - }); - } - - @After - public void after() { - store.clearAllTables(); - unFilteredEvents.clear(); - } - - /** - * This test makes sure that there is no additional load on the database because of additional fetches done by - * notification fetcher by making sure that there are no cache miss when there are no gaps and out-of-sequence - * notifications. - * @throws Exception - */ - @Test - public void testWithNoGapsAndOutOfSequenceNotifications() throws Exception { - - try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) { - List<NotificationEvent> filteredEvents; - - for (int count = 1; count <= 150; count++) { - unFilteredEvents.add(new NotificationEvent(count, 0, "CREATE_DATABASE", "")); - } - - filteredEvents = fetcher.fetchNotifications(0); - assertEquals(150, filteredEvents.size()); - assertEquals(1, filteredEvents.get(0).getEventId()); - assertEquals("CREATE_DATABASE", filteredEvents.get(0).getEventType()); - - filteredEvents = fetcher.fetchNotifications(150); - assertEquals(0, filteredEvents.size()); - assertEquals(0, fetcher.getNotificationCacheMissCount()); - } - } - - /** - * This test makes sure that there is no additional load on the database because of additional fetches done by - * notification fetcher by making sure that there are no cache miss even when there are gaps and out-of-sequence - * notifications. - * @throws Exception - */ - @Test - public void testWithGapsAndOutOfSequenceNotifications() throws Exception { - try (HiveNotificationFetcher fetcher = new HiveNotificationFetcher(store, hmsConnection, new Configuration())) { - List<NotificationEvent> filteredEvents = new ArrayList<NotificationEvent>(); - int count = 1; - for (int fetchCount = 1; fetchCount <= 10; fetchCount++) { - for (; count <= (fetchCount * 10); count++) { - if (!(count % 3 == 0 || count % 7 == 0) || (count %10 == 0)) { - unFilteredEvents.add(new NotificationEvent(count, 0, "CREATE_DATABASE", "")); - } - } - filteredEvents = fetcher.fetchNotifications(count - 1); - } - - assertTrue("Invalid notification count", (filteredEvents.size() < 100)); - assertEquals(0, fetcher.getNotificationCacheMissCount()); - - for (count = 1; count <= 100; count++) { - if (count % 3 == 0 || count % 7 == 0) { - unFilteredEvents.add(new NotificationEvent(count, 0, "CREATE_DATABASE", "")); - } - } - fetcher.fetchNotifications(count - 1); - assertEquals(0, fetcher.getNotificationCacheMissCount()); - } - } -} http://git-wip-us.apache.org/repos/asf/sentry/blob/a178d7ed/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestSnapshotCreation.java ---------------------------------------------------------------------- diff --git a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestSnapshotCreation.java b/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestSnapshotCreation.java deleted file mode 100644 index 88ce7f9..0000000 --- a/sentry-tests/sentry-tests-hive/src/test/java/org/apache/sentry/tests/e2e/dbprovider/TestSnapshotCreation.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sentry.tests.e2e.dbprovider; - -import static org.junit.Assert.assertEquals; - -import java.sql.Connection; -import java.sql.Statement; - -import org.apache.sentry.tests.e2e.hdfs.TestHDFSIntegrationBase; -import org.junit.Before; -import org.junit.Test; - -/** - * This class covers basic scenario of snapshot creation and makes sure that - * HMSFollower takes a full snapshot when sentry server comes up and not - * subsequently. - */ -public class TestSnapshotCreation extends TestHDFSIntegrationBase { - - private final static String tableName1 = "tb_1"; - private final static String tableName2 = "tb_2"; - private final static String tableName3 = "tb_3"; - private final static String tableName4 = "tb_4"; - - protected static final String DB1 = "db_1", - DB2 = "db_2"; - - private Connection connection; - private Statement statement; - - @Before - public void initialize() throws Exception { - super.setUpTempDir(); - admin = "hive"; - connection = hiveServer2.createConnection(admin, admin); - statement = connection.createStatement(); - statement.execute("create role admin_role"); - statement.execute("grant role admin_role to group hive"); - statement.execute("grant all on server server1 to role admin_role"); - } - - @Test - public void BasicSanity() throws Exception { - long latestSnapshotId = 0; - //Sleep for a sec allowing HMSFollower to create a snapshot - Thread.sleep(1000); - dbNames = new String[]{DB1}; - roles = new String[]{"admin_role", "all_db1", "all_tbl1", "all_tbl2"}; - do { - //Sleep for a sec allowing HMSFollower to create a snapshot - Thread.sleep(1000); - latestSnapshotId = sentryServer.get(0).getCurrentAuthzPathsSnapshotID(); - } while (latestSnapshotId == 0); - - statement.execute("CREATE DATABASE " + DB1); - statement.execute("CREATE DATABASE " + DB2); - statement.execute("create table " + DB1 + "." + tableName1 - + " (under_col int comment 'the under column', value string)"); - statement.execute("create table " + DB1 + "." + tableName2 - + " (under_col int comment 'the under column', value string)"); - - Thread.sleep(5000); - - statement.execute("create table " + DB2 + "." + tableName3 - + " (under_col int comment 'the under column', value string)"); - statement.execute("create table " + DB2 + "." + tableName4 - + " (under_col int comment 'the under column', value string)"); - - assertEquals("Another snapshot is created, Snapshot ID: ", latestSnapshotId, sentryServer.get(0).getCurrentAuthzPathsSnapshotID()); - } -}