Revert "SENTRY-1769 Refactor HMSFollower Class (Kalyan Kumar Kalvagadda 
reviewed by Vamsee Yarlagadda, Na Li, Sergio Pena and  Alexander Kolbasov)"

This reverts commit e5bb466efc621318c69f4a929dea3e39a77962af.


Project: http://git-wip-us.apache.org/repos/asf/sentry/repo
Commit: http://git-wip-us.apache.org/repos/asf/sentry/commit/c56f48cc
Tree: http://git-wip-us.apache.org/repos/asf/sentry/tree/c56f48cc
Diff: http://git-wip-us.apache.org/repos/asf/sentry/diff/c56f48cc

Branch: refs/heads/sentry-ha-redesign
Commit: c56f48cc62e5b346d6c8432ccfa07d7f9fa4f110
Parents: e5bb466
Author: Kalyan Kumar Kalvagadda <kkal...@cloudera.com>
Authored: Thu Jul 13 18:57:20 2017 -0500
Committer: Kalyan Kumar Kalvagadda <kkal...@cloudera.com>
Committed: Thu Jul 13 18:57:20 2017 -0500

----------------------------------------------------------------------
 .../apache/sentry/hdfs/PathImageRetriever.java  |  13 +-
 .../org/apache/sentry/hdfs/SentryPlugin.java    |  40 +-
 .../provider/db/SentryPolicyStorePlugin.java    |   4 +-
 .../db/service/model/MAuthzPathsSnapshotId.java |   2 +-
 .../service/model/MSentryHmsNotification.java   |   8 +-
 .../db/service/persistent/PathsImage.java       |  10 +-
 .../db/service/persistent/SentryStore.java      |  13 +-
 .../thrift/SentryPolicyStoreProcessor.java      |   5 +-
 .../sentry/service/thrift/HMSFollower.java      | 714 ++++++++++++++-----
 .../service/thrift/NotificationProcessor.java   | 571 ++++-----------
 .../sentry/service/thrift/SentryHmsClient.java  | 244 -------
 .../sentry/service/thrift/SentryService.java    |  17 +-
 .../service/thrift/SentryServiceUtil.java       |  35 +-
 .../sentry/service/thrift/ServiceConstants.java |   2 +-
 .../db/service/persistent/TestSentryStore.java  |   1 +
 .../sentry/service/thrift/TestHMSFollower.java  | 348 +++------
 .../thrift/TestNotificationProcessor.java       | 465 ------------
 .../service/thrift/TestSentryHmsClient.java     | 470 ------------
 .../TestDbPrivilegeCleanupOnDrop.java           |   4 +-
 19 files changed, 820 insertions(+), 2146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
index ac5c5b2..2426b40 100644
--- 
a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
+++ 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/PathImageRetriever.java
@@ -25,14 +25,9 @@ import 
org.apache.sentry.provider.db.service.persistent.PathsImage;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
 
 import javax.annotation.concurrent.ThreadSafe;
-
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.sentry.hdfs.service.thrift.TPathChanges;
-import org.apache.sentry.provider.db.service.persistent.PathsImage;
-import org.apache.sentry.provider.db.service.persistent.SentryStore;
 
 /**
  * PathImageRetriever obtains a complete snapshot of Hive Paths from a 
persistent
@@ -42,10 +37,10 @@ import 
org.apache.sentry.provider.db.service.persistent.SentryStore;
  * It is a thread safe class, as all the underlying database operation is 
thread safe.
  */
 @ThreadSafe
-class PathImageRetriever implements ImageRetriever<PathsUpdate> {
+public class PathImageRetriever implements ImageRetriever<PathsUpdate> {
 
-  private static final String[] root = {"/"};
   private final SentryStore sentryStore;
+  private static final String[] root = {"/"};
 
   PathImageRetriever(SentryStore sentryStore) {
     this.sentryStore = sentryStore;
@@ -60,8 +55,8 @@ class PathImageRetriever implements 
ImageRetriever<PathsUpdate> {
       // persistent storage, along with the sequence number of latest
       // delta change the snapshot corresponds to.
       PathsImage pathsImage = sentryStore.retrieveFullPathsImage();
+      long curSeqNum = pathsImage.getCurSeqNum();
       long curImgNum = pathsImage.getCurImgNum();
-      long curSeqNum = pathsImage.getId();
       Map<String, Set<String>> pathImage = pathsImage.getPathImage();
 
       // Translates the complete Hive paths snapshot into a PathsUpdate.
@@ -78,7 +73,7 @@ class PathImageRetriever implements 
ImageRetriever<PathsUpdate> {
       }
 
       SentryHdfsMetricsUtil.getPathChangesHistogram.update(pathsUpdate
-          .getPathChanges().size());
+            .getPathChanges().size());
 
       // Translate PathsUpdate that contains a full image to TPathsDump for
       // consumer (NN) to be able to quickly construct UpdateableAuthzPaths

http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
----------------------------------------------------------------------
diff --git 
a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
index 0c3ba5b..d6100de 100644
--- 
a/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
+++ 
b/sentry-hdfs/sentry-hdfs-service/src/main/java/org/apache/sentry/hdfs/SentryPlugin.java
@@ -23,14 +23,12 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.core.common.exception.SentryInvalidInputException;
 import org.apache.sentry.core.common.utils.SigUtils;
 import org.apache.sentry.hdfs.ServiceConstants.ServerConfig;
 import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
 import org.apache.sentry.hdfs.service.thrift.TRoleChanges;
 import org.apache.sentry.provider.db.SentryPolicyStorePlugin;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
-import org.apache.sentry.service.thrift.SentryServiceUtil;
 import 
org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest;
 import 
org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleDeleteGroupsRequest;
 import 
org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleGrantPrivilegeRequest;
@@ -40,7 +38,7 @@ import 
org.apache.sentry.provider.db.service.thrift.TDropSentryRoleRequest;
 import org.apache.sentry.provider.db.service.thrift.TRenamePrivilegesRequest;
 import org.apache.sentry.provider.db.service.thrift.TSentryGroup;
 import org.apache.sentry.provider.db.service.thrift.TSentryPrivilege;
-import org.apache.sentry.service.thrift.HmsFollower;
+import org.apache.sentry.service.thrift.HMSFollower;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +56,7 @@ import static 
org.apache.sentry.hdfs.service.thrift.sentry_hdfs_serviceConstants
    * <ol>
    * <li>
    * Whenever updates happen on HMS, a corresponding notification log is 
generated,
-   * and {@link HmsFollower} will process the notification event and persist 
it in database.
+   * and {@link HMSFollower} will process the notification event and persist 
it in database.
    * <li>
    * The NameNode periodically asks Sentry for updates. Sentry may return zero
    * or more updates previously received via HMS notification log.
@@ -242,22 +240,16 @@ public class SentryPlugin implements 
SentryPolicyStorePlugin, SigUtils.SigListen
 
   @Override
   public Update onRenameSentryPrivilege(TRenamePrivilegesRequest request)
-      throws SentryPluginException, SentryInvalidInputException{
-    String oldAuthz = null;
-    String newAuthz = null;
-    try {
-      oldAuthz = SentryServiceUtil.getAuthzObj(request.getOldAuthorizable());
-      newAuthz = SentryServiceUtil.getAuthzObj(request.getNewAuthorizable());
-    } catch (SentryInvalidInputException failure) {
-      LOGGER.error("onRenameSentryPrivilege, Could not rename sentry privilege 
", failure);
-      throw failure;
-    }
+      throws SentryPluginException {
+    String oldAuthz = HMSFollower.getAuthzObj(request.getOldAuthorizable());
+    String newAuthz = HMSFollower.getAuthzObj(request.getNewAuthorizable());
     PermissionsUpdate update = new PermissionsUpdate();
     TPrivilegeChanges privUpdate = 
update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
     privUpdate.putToAddPrivileges(newAuthz, newAuthz);
     privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
 
-    LOGGER.debug("onRenameSentryPrivilege, Authz Perm preUpdate [ {} ]", 
oldAuthz);
+    LOGGER.debug(String.format("onRenameSentryPrivilege, Authz Perm preUpdate 
[ %s ]",
+                  oldAuthz));
     return update;
   }
 
@@ -291,7 +283,8 @@ public class SentryPlugin implements 
SentryPolicyStorePlugin, SigUtils.SigListen
     update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
         roleName, privilege.getAction().toUpperCase());
 
-    LOGGER.debug("onAlterSentryRoleRevokePrivilegeCore, Authz Perm preUpdate [ 
{} ]", authzObj);
+    LOGGER.debug(String.format("onAlterSentryRoleRevokePrivilegeCore, Authz 
Perm preUpdate [ %s ]",
+                  authzObj));
     return update;
   }
 
@@ -303,7 +296,8 @@ public class SentryPlugin implements 
SentryPolicyStorePlugin, SigUtils.SigListen
         request.getRoleName(), PermissionsUpdate.ALL_AUTHZ_OBJ);
     
update.addRoleUpdate(request.getRoleName()).addToDelGroups(PermissionsUpdate.ALL_GROUPS);
 
-    LOGGER.debug("onDropSentryRole, Authz Perm preUpdate [ {} ]", 
request.getRoleName());
+    LOGGER.debug(String.format("onDropSentryRole, Authz Perm preUpdate [ %s ]",
+                  request.getRoleName()));
     return update;
   }
 
@@ -311,18 +305,12 @@ public class SentryPlugin implements 
SentryPolicyStorePlugin, SigUtils.SigListen
   public Update onDropSentryPrivilege(TDropPrivilegesRequest request)
       throws SentryPluginException {
     PermissionsUpdate update = new PermissionsUpdate();
-    String authzObj = null;
-    try {
-       authzObj = SentryServiceUtil.getAuthzObj(request.getAuthorizable());
-    } catch (SentryInvalidInputException failure) {
-      LOGGER.error("onDropSentryPrivilege, Could not drop sentry privilege "
-        + failure.toString(), failure);
-      throw new SentryPluginException(failure.getMessage(), failure);
-    }
+    String authzObj = HMSFollower.getAuthzObj(request.getAuthorizable());
     update.addPrivilegeUpdate(authzObj).putToDelPrivileges(
         PermissionsUpdate.ALL_ROLES, PermissionsUpdate.ALL_ROLES);
 
-    LOGGER.debug("onDropSentryPrivilege, Authz Perm preUpdate [ {} ]", 
authzObj);
+    LOGGER.debug(String.format("onDropSentryPrivilege, Authz Perm preUpdate [ 
%s ]",
+                  authzObj));
     return update;
   }
 

http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
index a22b422..5b8a572 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/SentryPolicyStorePlugin.java
@@ -19,7 +19,6 @@
 package org.apache.sentry.provider.db;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.sentry.core.common.exception.SentryInvalidInputException;
 import org.apache.sentry.core.common.exception.SentryUserException;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
 import 
org.apache.sentry.provider.db.service.thrift.TAlterSentryRoleAddGroupsRequest;
@@ -69,8 +68,7 @@ public interface SentryPolicyStorePlugin {
 
   Update onDropSentryRole(TDropSentryRoleRequest tRequest) throws 
SentryPluginException;
 
-  Update onRenameSentryPrivilege(TRenamePrivilegesRequest request)
-      throws SentryPluginException, SentryInvalidInputException;
+  Update onRenameSentryPrivilege(TRenamePrivilegesRequest request) throws 
SentryPluginException;
 
   Update onDropSentryPrivilege(TDropPrivilegesRequest request) throws 
SentryPluginException;
 }

http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java
index d8d54f3..d683c2c 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MAuthzPathsSnapshotId.java
@@ -22,7 +22,7 @@ import javax.jdo.annotations.PrimaryKey;
 
 /**
  * This class is used to persist new authz paths snapshots IDs. An authz path 
snapshot ID is required by
- * the MAuthzPathsMapping to detect new HMS snapshots created by the 
HmsFollower.
+ * the MAuthzPathsMapping to detect new HMS snapshots created by the 
HMSFollower.
  */
 @PersistenceCapable
 public class MAuthzPathsSnapshotId {

http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
index 166bec7..0d54548 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/model/MSentryHmsNotification.java
@@ -20,13 +20,15 @@ package org.apache.sentry.provider.db.service.model;
 /**
  * Database backend store for HMS Notification ID's. All the notifications 
that are processed
  * by sentry are stored.
- * <p>
+ */
+
+/*
  * <p> HMS notification ID's are stored in separate table for three reasons</p>
  * <ol>
  * <li>SENTRY_PATH_CHANGE is not updated for every notification that is 
received from HMS. There
- * are cases where HmsFollower doesn't process notifications and skip's them. 
Depending on
+ * are cases where HMSFollower doesn't process notifications and skip's them. 
Depending on
  * SENTRY_PATH_CHANGE information may not provide the last notification 
processed.</li>
- * <li> There could be cases where HmsFollower thread in multiple sentry 
servers acting as a
+ * <li> There could be cases where HMSFollower thread in multiple sentry 
servers acting as a
  * leader and process HMS notifications. we need to avoid processing the 
notifications
  * multiple times. This can be made sure by always having some number of 
notification
  * information always regardless of purging interval.</li>

http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
index 409a557..4d852e6 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/PathsImage.java
@@ -31,17 +31,17 @@ public class PathsImage {
 
   // A full image of hiveObj to Paths mapping.
   private final Map<String, Set<String>> pathImage;
-  private final long id;
+  private final long curSeqNum;
   private final long curImgNum;
 
-  public PathsImage(Map<String, Set<String>> pathImage, long id, long 
curImgNum) {
+  public PathsImage(Map<String, Set<String>> pathImage, long curSeqNum, long 
curImgNum) {
     this.pathImage = pathImage;
-    this.id = id;
+    this.curSeqNum = curSeqNum;
     this.curImgNum = curImgNum;
   }
 
-  public long getId() {
-    return id;
+  public long getCurSeqNum() {
+    return curSeqNum;
   }
 
   public long getCurImgNum() {

http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
index 7b02e2c..979e45b 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/persistent/SentryStore.java
@@ -107,7 +107,7 @@ import static 
org.apache.sentry.provider.db.service.persistent.QueryParamBuilder
  * single node and rely on DB for multi-node synchronization.
  * <p>
  * This isn't much of a problem for path updates since they are
- * driven by HmsFollower which usually runs on a single leader
+ * driven by HMSFollower which usually runs on a single leader
  * node, but permission updates originate from clients
  * directly and may be highly concurrent.
  * <p>
@@ -151,7 +151,7 @@ public class SentryStore {
   private static final long COUNT_VALUE_UNKNOWN = -1L;
 
   // Representation for unknown HMS notification ID
-  public static final long NOTIFICATION_UNKNOWN = -1L;
+  private static final long NOTIFICATION_UNKNOWN = -1L;
 
   private static final Set<String> ALL_ACTIONS = 
Sets.newHashSet(AccessConstants.ALL,
       AccessConstants.SELECT, AccessConstants.INSERT, AccessConstants.ALTER,
@@ -169,8 +169,8 @@ public class SentryStore {
   private final TransactionManager tm;
 
   /**
-   * counterWait is used to synchronize notifications between Thrift and 
HmsFollower.
-   * Technically it doesn't belong here, but the only thing that connects 
HmsFollower
+   * counterWait is used to synchronize notifications between Thrift and 
HMSFollower.
+   * Technically it doesn't belong here, but the only thing that connects 
HMSFollower
    * and Thrift API is SentryStore. An alternative could be a singleton 
CounterWait or
    * some factory that returns CounterWait instances keyed by name, but this 
complicates
    * things unnecessary.
@@ -2674,7 +2674,7 @@ public class SentryStore {
   /**
    * Persist an up-to-date HMS snapshot into Sentry DB in a single transaction.
    *
-   * @param authzPaths paths to be be persisted
+   * @param authzPaths Mapping of hiveObj to &lt Paths &lt
    * @throws Exception
    */
   public void persistFullPathsImage(final Map<String, Set<String>> authzPaths) 
throws Exception {
@@ -2685,6 +2685,7 @@ public class SentryStore {
 
           long snapshotID = getCurrentAuthzPathsSnapshotID(pm);
           long nextSnapshotID = snapshotID + 1;
+
           pm.makePersistent(new MAuthzPathsSnapshotId(nextSnapshotID));
           for (Map.Entry<String, Set<String>> authzPath : 
authzPaths.entrySet()) {
             pm.makePersistent(new MAuthzPathsMapping(nextSnapshotID, 
authzPath.getKey(), authzPath.getValue()));
@@ -3703,7 +3704,7 @@ public class SentryStore {
    *
    * @param pm the PersistenceManager
    * @return EMPTY_NOTIFICATION_ID(0) when there are no notifications 
processed.
-   * else  last NotificationID processed by HmsFollower
+   * else  last NotificationID processed by HMSFollower
    */
   static Long getLastProcessedNotificationIDCore(
       PersistenceManager pm) {

http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
index cfd0e30..ad23334 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/provider/db/service/thrift/SentryPolicyStoreProcessor.java
@@ -945,10 +945,7 @@ public class SentryPolicyStoreProcessor implements 
SentryPolicyService.Iface {
     } catch (SentryThriftAPIMismatchException e) {
       LOGGER.error(e.getMessage(), e);
       response.setStatus(Status.THRIFT_VERSION_MISMATCH(e.getMessage(), e));
-    } catch (SentryInvalidInputException e) {
-      response.setStatus(Status.InvalidInput(e.getMessage(), e));
-    }
-    catch (Exception e) {
+    } catch (Exception e) {
       String msg = "Unknown error for request: " + request + ", message: "
           + e.getMessage();
       LOGGER.error(msg, e);

http://git-wip-us.apache.org/repos/asf/sentry/blob/c56f48cc/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
----------------------------------------------------------------------
diff --git 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
index a9d05b1..1b6ae18 100644
--- 
a/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
+++ 
b/sentry-provider/sentry-provider-db/src/main/java/org/apache/sentry/service/thrift/HMSFollower.java
@@ -1,275 +1,641 @@
-/*
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-  <p>
-  http://www.apache.org/licenses/LICENSE-2.0
-  <p>
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-
 package org.apache.sentry.service.thrift;
 
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.codahale.metrics.Timer.Context;
 import com.google.common.annotations.VisibleForTesting;
-
-import java.net.SocketException;
-
-import java.util.Collection;
-import javax.jdo.JDODataStoreException;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hive.hcatalog.messaging.HCatEventMessage;
 import org.apache.sentry.binding.hive.conf.HiveAuthzConf;
-import org.apache.sentry.provider.db.service.persistent.PathsImage;
+import org.apache.sentry.core.common.exception.SentryInvalidHMSEventException;
+import org.apache.sentry.core.common.exception.SentryInvalidInputException;
+import org.apache.sentry.core.common.exception.SentryNoSuchObjectException;
+import org.apache.sentry.hdfs.PermissionsUpdate;
+import org.apache.sentry.hdfs.service.thrift.TPrivilegeChanges;
+import org.apache.sentry.provider.db.SentryPolicyStorePlugin;
 import org.apache.sentry.provider.db.service.persistent.SentryStore;
+import org.apache.sentry.provider.db.service.thrift.SentryMetrics;
+import org.apache.sentry.provider.db.service.thrift.TSentryAuthorizable;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.sentry.binding.metastore.messaging.json.*;
+
+import javax.jdo.JDODataStoreException;
+import javax.security.auth.login.LoginException;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static com.codahale.metrics.MetricRegistry.name;
+import static 
org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_CREATE_WITH_POLICY_STORE;
+import static 
org.apache.sentry.binding.hive.conf.HiveAuthzConf.AuthzConfVars.AUTHZ_SYNC_DROP_WITH_POLICY_STORE;
+import static org.apache.sentry.hdfs.Updateable.Update;
 
 /**
- * HmsFollower is the thread which follows the Hive MetaStore state changes 
from Sentry.
+ * HMSFollower is the thread which follows the Hive MetaStore state changes 
from Sentry.
  * It gets the full update and notification logs from HMS and applies it to
  * update permissions stored in Sentry using SentryStore and also update the 
&lt obj,path &gt state
  * stored for HDFS-Sentry sync.
  */
-public class HmsFollower implements Runnable, AutoCloseable {
-
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(HmsFollower.class);
-  private static boolean connectedToHms = false;
-  private final SentryHmsClient client;
+@SuppressWarnings("PMD")
+public class HMSFollower implements Runnable, AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HMSFollower.class);
+  private HiveSimpleConnectionFactory hiveConnectionFactory;
+  // Track the latest eventId of the event that has been logged. So we don't 
log the same message
+  private long lastLoggedEventId = SentryStore.EMPTY_CHANGE_ID;
+  private static boolean connectedToHMS = false;
+  private HMSClient client;
   private final Configuration authzConf;
   private final SentryStore sentryStore;
-  private final NotificationProcessor notificationProcessor;
+  private String hiveInstance;
 
+  private boolean needLogHMSSupportReady = true;
   private final LeaderStatusMonitor leaderMonitor;
 
-  /**
-   * Configuring Hms Follower thread.
-   *
-   * @param conf sentry configuration
-   * @param store sentry store
-   * @param leaderMonitor singleton instance of LeaderStatusMonitor
-   */
-  HmsFollower(Configuration conf, SentryStore store, LeaderStatusMonitor 
leaderMonitor,
-              HiveSimpleConnectionFactory hiveConnectionFactory) {
-    this(conf, store, leaderMonitor, hiveConnectionFactory, null);
-  }
+  private static final String SNAPSHOT = "snapshot";
+  /** Measures time to get full snapshot */
+  private final Timer updateTimer = SentryMetrics.getInstance()
+      .getTimer(name(FullUpdateInitializer.class, SNAPSHOT));
+  /** Number of times update failed */
+  private final Counter failedSnapshotsCount = SentryMetrics.getInstance()
+      .getCounter(name(FullUpdateInitializer.class, "failed"));
 
-  @VisibleForTesting
-  /**
-   * Constructor should be used only for testing purposes.
-   *
-   * @param conf sentry configuration
-   * @param store sentry store
-   * @param leaderMonitor
-   * @param authServerName Server that sentry is Authorizing
-   */
-  HmsFollower(Configuration conf, SentryStore store, LeaderStatusMonitor 
leaderMonitor,
-              HiveSimpleConnectionFactory hiveConnectionFactory, String 
authServerName) {
-    LOGGER.info("HmsFollower is being initialized");
+  HMSFollower(Configuration conf, SentryStore store, LeaderStatusMonitor 
leaderMonitor,
+              HiveSimpleConnectionFactory hiveConnectionFactory) {
     authzConf = conf;
     this.leaderMonitor = leaderMonitor;
     sentryStore = store;
-   if (authServerName == null) {
-     HiveConf hiveConf = new HiveConf();
-     authServerName = 
hiveConf.get(HiveAuthzConf.AuthzConfVars.AUTHZ_SERVER_NAME.getVar());
-   }
-    notificationProcessor = new NotificationProcessor(sentryStore, 
authServerName, authzConf);
-    client = new SentryHmsClient(authzConf, hiveConnectionFactory);
+    this.hiveConnectionFactory = hiveConnectionFactory;
+  }
+
+  @VisibleForTesting
+  HMSFollower(Configuration conf, SentryStore sentryStore, String hiveInstance)
+      throws IOException, LoginException {
+    this(conf, sentryStore, null, null);
+    this.hiveInstance = hiveInstance;
+    hiveConnectionFactory = new HiveSimpleConnectionFactory(conf, new 
HiveConf());
+    hiveConnectionFactory.init();
   }
 
   @VisibleForTesting
-  public static boolean isConnectedToHms() {
-    return connectedToHms;
+  public static boolean isConnectedToHMS() {
+    return connectedToHMS;
   }
 
   @Override
   public void close() {
-    if (client != null) {
-      // Close any outstanding connections to HMS
-      try {
-        client.disconnect();
-      } catch (Exception failure) {
-        LOGGER.error("Failed to close the Sentry Hms Client", failure);
-      }
+    // Close any outstanding connections to HMS
+    closeHMSConnection();
+    try {
+      hiveConnectionFactory.close();
+    } catch (Exception e) {
+      LOGGER.error("failed to close Hive Connection Factory", e);
     }
   }
 
+  /**
+   * Returns HMS Client if successful, returns null if HMS is not ready yet to 
take connections
+   * Throws @LoginException if Kerberos context creation failed using Sentry's 
kerberos credentials
+   * Throws @MetaException if there was a problem on creating an HMSClient
+   */
+  private HiveMetaStoreClient getMetaStoreClient()
+    throws IOException, InterruptedException, MetaException {
+    if (client == null) {
+      client = hiveConnectionFactory.connect();
+      connectedToHMS = true;
+    }
+    return client.getClient();
+  }
+
   @Override
   public void run() {
-    long lastProcessedNotificationId;
+    Long lastProcessedNotificationID;
     try {
-      // Initializing lastProcessedNotificationId based on the latest 
persisted notification ID.
-      lastProcessedNotificationId = 
sentryStore.getLastProcessedNotificationID();
+      // Initializing lastProcessedNotificationID based on the latest 
persisted notification ID.
+      lastProcessedNotificationID = 
sentryStore.getLastProcessedNotificationID();
     } catch (Exception e) {
-      LOGGER.error("Failed to get the last processed notification id from 
sentry store, "
-          + "Skipping the processing", e);
+      LOGGER.error("Failed to get the last processed notification id from 
sentry store, " +
+        "Skipping the processing", e);
       return;
     }
     // Wake any clients connected to this service waiting for HMS already 
processed notifications.
-    wakeUpWaitingClientsForSync(lastProcessedNotificationId);
+    wakeUpWaitingClientsForSync(lastProcessedNotificationID);
     // Only the leader should listen to HMS updates
     if ((leaderMonitor != null) && !leaderMonitor.isLeader()) {
       // Close any outstanding connections to HMS
-      close();
+      closeHMSConnection();
       return;
     }
-    syncupWithHms(lastProcessedNotificationId);
+    processHiveMetastoreUpdates();
   }
 
   /**
-   * Processes new Hive Metastore notifications.
-   *
-   * <p>If no notifications are processed yet, then it
-   * does a full initial snapshot of the Hive Metastore followed by new 
notifications updates that
-   * could have happened after it.
+   * Wakes up HMS waiters waiting for a specific event notification.
    *
-   * <p>Clients connections waiting for an event notification will be
-   * woken up afterwards.
+   * @param eventID
    */
-  private void syncupWithHms(long notificationId) {
-    try {
-      client.connect();
-      connectedToHms = true;
-    } catch (Throwable e) {
-      LOGGER.error("HmsFollower cannot connect to HMS!!", e);
-      return;
+  private void wakeUpWaitingClientsForSync(long eventID) {
+    CounterWait counterWait = sentryStore.getCounterWait();
+
+    // Wake up any HMS waiters that are waiting for this ID.
+    // counterWait should never be null, but tests mock SentryStore and a 
mocked one
+    // doesn't have it.
+    if (counterWait != null) {
+      counterWait.update(eventID);
     }
+  }
 
+  /**
+   * Processes new Hive Metastore notifications.
+   *
+   * If no notifications are processed yet, then it does a full initial 
snapshot of the Hive Metastore
+   * followed by new notifications updates that could have happened after it.
+   *
+   * Clients connections waiting for an event notification will be woken up 
afterwards.
+   */
+  private void processHiveMetastoreUpdates() {
     try {
-      long lastProcessedNotificationId = notificationId;
-      // Create a full HMS snapshot if there is none
       // Decision of taking full snapshot is based on AuthzPathsMapping 
information persisted
-      // in the sentry persistent store. If AuthzPathsMapping is empty, 
snapshot is needed.
+      // in the sentry persistent store. If AuthzPathsMapping is empty, 
shapshot is needed.
+      Long lastProcessedNotificationID;
       if (sentryStore.isAuthzPathsMappingEmpty()) {
-        lastProcessedNotificationId = createFullSnapshot();
-        if (lastProcessedNotificationId == SentryStore.EMPTY_NOTIFICATION_ID) {
+        // TODO: expose time used for full update in the metrics
+
+        // To ensure point-in-time snapshot consistency, need to make sure
+        // there were no HMS updates while retrieving the snapshot.
+        // In detail the logic is:
+        //
+        // 1. Read current HMS notification ID_initial
+        // 2. Read HMS metadata state
+        // 3. Read current notification ID_new
+        // 4. If ID_initial != ID_new then the attempts for retrieving full 
HMS snapshot
+        // will be dropped. A new attempts will be made after 500 milliseconds 
when
+        // HMSFollower run again.
+
+        CurrentNotificationEventId eventIDBefore = 
getMetaStoreClient().getCurrentNotificationEventId();
+        LOGGER.info("Before fetching hive full snapshot, Current 
NotificationID = {}", eventIDBefore);
+
+        Map<String, Set<String>> pathsFullSnapshot = fetchFullUpdate();
+        if(pathsFullSnapshot.isEmpty()) {
+          LOGGER.info("Hive full snapshot is Empty. Perhaps, HMS does not have 
any data");
           return;
         }
+
+        CurrentNotificationEventId eventIDAfter = 
getMetaStoreClient().getCurrentNotificationEventId();
+        LOGGER.info("After fetching hive full snapshot, Current NotificationID 
= {}", eventIDAfter);
+
+        if (!eventIDBefore.equals(eventIDAfter)) {
+          LOGGER.error("Fail to get a point-in-time hive full snapshot. 
Current ID = {}",
+            eventIDAfter);
+          return;
+        }
+
+        LOGGER.info("Successfully fetched hive full snapshot, Current 
NotificationID = {}",
+          eventIDAfter);
+        // As eventIDAfter is the last event that was processed, eventIDAfter 
is used to update
+        // lastProcessedNotificationID instead of getting it from persistent 
store.
+        lastProcessedNotificationID = eventIDAfter.getEventId();
+        sentryStore.persistFullPathsImage(pathsFullSnapshot);
+        
sentryStore.persistLastProcessedNotificationID(eventIDAfter.getEventId());
+        // Wake up any HMS waiters that could have been put on hold before 
getting the eventIDBefore value.
+        wakeUpWaitingClientsForSync(lastProcessedNotificationID);
+      } else {
+        // Every time HMSFollower is scheduled to run, value should be updates 
based
+        // on the value stored in database.
+        lastProcessedNotificationID = 
sentryStore.getLastProcessedNotificationID();
+      }
+
+      // HMSFollower connected to HMS and it finished full snapshot if that 
was required
+      // Log this message only once
+      if (needLogHMSSupportReady && connectedToHMS) {
+        LOGGER.info("Sentry HMS support is ready");
+        needLogHMSSupportReady = false;
+      }
+
+      // HIVE-15761: Currently getNextNotification API may return an empty
+      // NotificationEventResponse causing TProtocolException.
+      // Workaround: Only processes the notification events newer than the 
last updated one.
+      CurrentNotificationEventId eventId = 
getMetaStoreClient().getCurrentNotificationEventId();
+      LOGGER.debug("Last Notification in HMS {} lastProcessedNotificationID is 
{}",
+        eventId.getEventId(), lastProcessedNotificationID);
+      if (eventId.getEventId() > lastProcessedNotificationID) {
+        NotificationEventResponse response =
+          
getMetaStoreClient().getNextNotification(lastProcessedNotificationID, 
Integer.MAX_VALUE, null);
+        if (response.isSetEvents()) {
+          if (!response.getEvents().isEmpty()) {
+            if (lastProcessedNotificationID != lastLoggedEventId) {
+              // Only log when there are updates and the notification ID has 
changed.
+              LOGGER.debug("lastProcessedNotificationID = {}. Processing {} 
events",
+                      lastProcessedNotificationID, 
response.getEvents().size());
+              lastLoggedEventId = lastProcessedNotificationID;
+            }
+
+            processNotificationEvents(response.getEvents());
+          }
+        }
       }
-      // Get the new notification from HMS and process them.
-      
processNotifications(client.getNotifications(lastProcessedNotificationId));
     } catch (TException e) {
-      // If the underlying exception is around socket exception,
-      // it is better to retry connection to HMS
+      // If the underlying exception is around socket exception, it is better 
to retry connection to HMS
       if (e.getCause() instanceof SocketException) {
-        LOGGER.error("Encountered Socket Exception during fetching 
Notification entries,"
-            + " will attempt to reconnect to HMS after configured interval", 
e);
-        close();
+        LOGGER.error("Encountered Socket Exception during fetching 
Notification entries, will reconnect to HMS", e);
+        client.invalidate();
+        closeHMSConnection();
       } else {
-        LOGGER.error("ThriftException occurred communicating with HMS", e);
+        LOGGER.error("ThriftException occured fetching Notification entries, 
will try", e);
       }
+    } catch (SentryInvalidInputException | SentryInvalidHMSEventException e) {
+      LOGGER.error("Encounter 
SentryInvalidInputException|SentryInvalidHMSEventException " +
+        "while processing notification log", e);
     } catch (Throwable t) {
       // catching errors to prevent the executor to halt.
-      LOGGER.error("Exception in HmsFollower! Caused by: " + t.getMessage(),
-          t);
+      LOGGER.error("Caught unexpected exception in HMSFollower! Caused by: " + 
t.getMessage(),
+        t.getCause());
+      t.printStackTrace();
     }
   }
 
   /**
-   * Request for full snapshot and persists it if there is no snapshot 
available in the
-   * sentry store. Also, wakes-up any waiting clients.
-   *
-   * @return ID of last notification processed.
-   * @throws Exception if there are failures
+   * Function to close HMS connection and any associated kerberos context (if 
applicable)
    */
-  private long createFullSnapshot() throws Exception {
-    LOGGER.debug("Attempting to take full HMS snapshot");
-    PathsImage snapshotInfo = client.getFullSnapshot();
-    if (snapshotInfo.getPathImage().isEmpty()) {
-      return snapshotInfo.getId();
-    }
+  private void closeHMSConnection() {
     try {
-      LOGGER.debug("Persisting HMS path full snapshot");
-      sentryStore.persistFullPathsImage(snapshotInfo.getPathImage());
-      sentryStore.persistLastProcessedNotificationID(snapshotInfo.getId());
-    } catch (Exception failure) {
-      LOGGER.error("Received exception while persisting HMS path full snapshot 
");
-      throw failure;
+      if (client != null) {
+        LOGGER.info("Closing the HMS client connection");
+        client.close();
+        connectedToHMS = false;
+      }
+    } finally {
+      client = null;
     }
-    // Wake up any HMS waiters that could have been put on hold before getting 
the
-    // eventIDBefore value.
-    wakeUpWaitingClientsForSync(snapshotInfo.getId());
-    // HmsFollower connected to HMS and it finished full snapshot if that was 
required
-    // Log this message only once
-    LOGGER.info("Sentry HMS support is ready");
-    return snapshotInfo.getId();
   }
 
   /**
-   * Process the collection of notifications and wake up any waiting clients.
-   * Also, persists the notification ID regardless of processing result.
+   * Retrieve a Hive full snapshot from HMS.
    *
-   * @param events list of event to be processed
-   * @throws Exception if the complete notification list is not processed 
because of JDO Exception
+   * @return HMS snapshot. Snapshot consists of a mapping from auth object name
+   * to the set of paths corresponding to that name.
+   * @throws InterruptedException
+   * @throws TException
+   * @throws ExecutionException
    */
-  void processNotifications(Collection<NotificationEvent> events) throws 
Exception {
-    boolean isNotificationProcessed;
-    if (events.isEmpty()) {
-      return;
+  private Map<String, Set<String>> fetchFullUpdate()
+    throws TException, ExecutionException {
+    LOGGER.info("Request full HMS snapshot");
+    try (FullUpdateInitializer updateInitializer =
+                 new FullUpdateInitializer(hiveConnectionFactory, authzConf);
+             Context context = updateTimer.time()) {
+      Map<String, Set<String>> pathsUpdate = 
updateInitializer.getFullHMSSnapshot();
+      LOGGER.info("Obtained full HMS snapshot");
+      return pathsUpdate;
+    } catch (Exception ignored) {
+      failedSnapshotsCount.inc();
+      // Caller will retry later
+      return Collections.emptyMap();
     }
+  }
+
+  private boolean syncWithPolicyStore(HiveAuthzConf.AuthzConfVars syncConfVar) 
{
+    return "true"
+        .equalsIgnoreCase((authzConf.get(syncConfVar.getVar(), 
syncConfVar.getDefault())));
+  }
+
+  /**
+   * Throws SentryInvalidHMSEventException if Notification event contains 
insufficient information
+   */
+  void processNotificationEvents(List<NotificationEvent> events) throws 
Exception {
+    SentryJSONMessageDeserializer deserializer = new 
SentryJSONMessageDeserializer();
+
+    boolean isNotificationProcessingSkipped = false;
     for (NotificationEvent event : events) {
-      isNotificationProcessed = false;
+      String dbName;
+      String tableName;
+      String oldLocation;
+      String newLocation;
+      String location;
+      List<String> locations;
+      NotificationProcessor notificationProcessor = new 
NotificationProcessor(sentryStore, LOGGER);
       try {
-        // Only the leader should process the notifications
-        if ((leaderMonitor != null) && !leaderMonitor.isLeader()) {
-          return;
+        LOGGER.debug("Processing notification with id {} and type {}", 
event.getEventId(),
+          event.getEventType());
+        switch (HCatEventMessage.EventType.valueOf(event.getEventType())) {
+          case CREATE_DATABASE:
+            SentryJSONCreateDatabaseMessage message =
+              deserializer.getCreateDatabaseMessage(event.getMessage());
+            dbName = message.getDB();
+            location = message.getLocation();
+            if ((dbName == null) || (location == null)) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.error(String.format("Create database event " +
+                  "has incomplete information. dbName = %s location = %s",
+                StringUtils.defaultIfBlank(dbName, "null"),
+                StringUtils.defaultIfBlank(location, "null")));
+              break;
+            }
+            if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) {
+              dropSentryDbPrivileges(dbName, event);
+            }
+            notificationProcessor.processCreateDatabase(dbName, location, 
event.getEventId());
+            break;
+          case DROP_DATABASE:
+            SentryJSONDropDatabaseMessage dropDatabaseMessage =
+              deserializer.getDropDatabaseMessage(event.getMessage());
+            dbName = dropDatabaseMessage.getDB();
+            location = dropDatabaseMessage.getLocation();
+            if (dbName == null) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.error("Drop database event has incomplete information: 
dbName = null");
+              break;
+            }
+            if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) {
+              dropSentryDbPrivileges(dbName, event);
+            }
+            notificationProcessor.processDropDatabase(dbName, location, 
event.getEventId());
+            break;
+          case CREATE_TABLE:
+            SentryJSONCreateTableMessage createTableMessage = 
deserializer.getCreateTableMessage(event.getMessage());
+            dbName = createTableMessage.getDB();
+            tableName = createTableMessage.getTable();
+            location = createTableMessage.getLocation();
+            if ((dbName == null) || (tableName == null) || (location == null)) 
{
+              isNotificationProcessingSkipped = true;
+              LOGGER.error(String.format("Create table event " + "has 
incomplete information."
+                  + " dbName = %s, tableName = %s, location = %s",
+                StringUtils.defaultIfBlank(dbName, "null"),
+                StringUtils.defaultIfBlank(tableName, "null"),
+                StringUtils.defaultIfBlank(location, "null")));
+              break;
+            }
+            if (syncWithPolicyStore(AUTHZ_SYNC_CREATE_WITH_POLICY_STORE)) {
+              dropSentryTablePrivileges(dbName, tableName, event);
+            }
+            notificationProcessor.processCreateTable(dbName, tableName, 
location, event.getEventId());
+            break;
+          case DROP_TABLE:
+            SentryJSONDropTableMessage dropTableMessage = 
deserializer.getDropTableMessage(event.getMessage());
+            dbName = dropTableMessage.getDB();
+            tableName = dropTableMessage.getTable();
+            if ((dbName == null) || (tableName == null)) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.error(String.format("Drop table event " +
+                  "has incomplete information. dbName = %s, tableName = %s",
+                StringUtils.defaultIfBlank(dbName, "null"),
+                StringUtils.defaultIfBlank(tableName, "null")));
+              break;
+            }
+            if (syncWithPolicyStore(AUTHZ_SYNC_DROP_WITH_POLICY_STORE)) {
+              dropSentryTablePrivileges(dbName, tableName, event);
+            }
+            notificationProcessor.processDropTable(dbName, tableName, 
event.getEventId());
+            break;
+          case ALTER_TABLE:
+            SentryJSONAlterTableMessage alterTableMessage = 
deserializer.getAlterTableMessage(event.getMessage());
+
+            String oldDbName = alterTableMessage.getDB();
+            String oldTableName = alterTableMessage.getTable();
+            String newDbName = event.getDbName();
+            String newTableName = event.getTableName();
+            oldLocation = alterTableMessage.getOldLocation();
+            newLocation = alterTableMessage.getNewLocation();
+
+            if ((oldDbName == null) ||
+              (oldTableName == null) ||
+              (newDbName == null) ||
+              (newTableName == null) ||
+              (oldLocation == null) ||
+              (newLocation == null)) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.error(String.format("Alter table event " +
+                  "has incomplete information. oldDbName = %s, oldTableName = 
%s, oldLocation = %s, " +
+                  "newDbName = %s, newTableName = %s, newLocation = %s",
+                StringUtils.defaultIfBlank(oldDbName, "null"),
+                StringUtils.defaultIfBlank(oldTableName, "null"),
+                StringUtils.defaultIfBlank(oldLocation, "null"),
+                StringUtils.defaultIfBlank(newDbName, "null"),
+                StringUtils.defaultIfBlank(newTableName, "null"),
+                StringUtils.defaultIfBlank(newLocation, "null")));
+              break;
+            } else if ((oldDbName.equalsIgnoreCase(newDbName)) &&
+              (oldTableName.equalsIgnoreCase(newTableName)) &&
+              (oldLocation.equalsIgnoreCase(newLocation))) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.info(String.format("Alter table notification ignored as 
neither name nor " +
+                "location has changed: oldAuthzObj = %s, oldLocation = %s, 
newAuthzObj = %s, " +
+                "newLocation = %s", oldDbName + "." + oldTableName , 
oldLocation,
+                newDbName + "." + newTableName, newLocation));
+              break;
+            }
+
+            if (!newDbName.equalsIgnoreCase(oldDbName) || 
!oldTableName.equalsIgnoreCase(newTableName)) {
+              // Name has changed
+              try {
+                renamePrivileges(oldDbName, oldTableName, newDbName, 
newTableName);
+              } catch (SentryNoSuchObjectException e) {
+                LOGGER.info("Rename Sentry privilege ignored as there are no 
privileges on the table: %s.%s",
+                  oldDbName, oldTableName);
+              } catch (Exception e) {
+                isNotificationProcessingSkipped = true;
+                LOGGER.info("Could not process Alter table event. Event: " + 
event.toString(), e);
+                break;
+              }
+            }
+            notificationProcessor.processAlterTable(oldDbName, newDbName, 
oldTableName,
+              newTableName, oldLocation, newLocation, event.getEventId());
+            break;
+          case ADD_PARTITION:
+            SentryJSONAddPartitionMessage addPartitionMessage =
+              deserializer.getAddPartitionMessage(event.getMessage());
+            dbName = addPartitionMessage.getDB();
+            tableName = addPartitionMessage.getTable();
+            locations = addPartitionMessage.getLocations();
+            if ((dbName == null) || (tableName == null) || (locations == 
null)) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.error(String.format("Create table event has incomplete 
information. " +
+                  "dbName = %s, tableName = %s, locations = %s",
+                StringUtils.defaultIfBlank(dbName, "null"),
+                StringUtils.defaultIfBlank(tableName, "null"),
+                locations != null ? locations.toString() : "null"));
+              break;
+            }
+            notificationProcessor.processAddPartition(dbName, tableName, 
locations, event.getEventId());
+            break;
+          case DROP_PARTITION:
+            SentryJSONDropPartitionMessage dropPartitionMessage =
+              deserializer.getDropPartitionMessage(event.getMessage());
+            dbName = dropPartitionMessage.getDB();
+            tableName = dropPartitionMessage.getTable();
+            locations = dropPartitionMessage.getLocations();
+            if ((dbName == null) || (tableName == null) || (locations == 
null)) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.error(String.format("Drop partition event " +
+                  "has incomplete information. dbName = %s, tableName = %s, 
location = %s",
+                StringUtils.defaultIfBlank(dbName, "null"),
+                StringUtils.defaultIfBlank(tableName, "null"),
+                locations != null ? locations.toString() : "null"));
+              break;
+            }
+            notificationProcessor.processDropPartition(dbName, tableName, 
locations, event.getEventId());
+
+            break;
+          case ALTER_PARTITION:
+            SentryJSONAlterPartitionMessage alterPartitionMessage =
+              deserializer.getAlterPartitionMessage(event.getMessage());
+            dbName = alterPartitionMessage.getDB();
+            tableName = alterPartitionMessage.getTable();
+            oldLocation = alterPartitionMessage.getOldLocation();
+            newLocation = alterPartitionMessage.getNewLocation();
+
+            if ((dbName == null) ||
+              (tableName == null) ||
+              (oldLocation == null) ||
+              (newLocation == null)) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.error(String.format("Alter partition event " +
+                  "has incomplete information. dbName = %s, tableName = %s, " +
+                  "oldLocation = %s, newLocation = %s",
+                StringUtils.defaultIfBlank(dbName, "null"),
+                StringUtils.defaultIfBlank(tableName, "null"),
+                StringUtils.defaultIfBlank(oldLocation, "null"),
+                StringUtils.defaultIfBlank(newLocation, "null")));
+              break;
+            } else if (oldLocation.equalsIgnoreCase(newLocation)) {
+              isNotificationProcessingSkipped = true;
+              LOGGER.info(String.format("Alter partition notification ignored 
as" +
+                "location has not changed: AuthzObj = %s, Location = %s", 
dbName + "." +
+                "." + tableName, oldLocation));
+              break;
+            }
+
+            notificationProcessor.processAlterPartition(dbName, tableName, 
oldLocation,
+              newLocation, event.getEventId());
+            break;
+          case INSERT:
+            // TODO DO we need to do anything here?
+            break;
         }
-        isNotificationProcessed = 
notificationProcessor.processNotificationEvent(event);
       } catch (Exception e) {
         if (e.getCause() instanceof JDODataStoreException) {
-          LOGGER.info("Received JDO Storage Exception, Could be because of 
processing "
-              + "duplicate notification");
+          LOGGER.info("Received JDO Storage Exception, Could be because of 
processing " +
+            "duplicate notification");
           if (event.getEventId() <= 
sentryStore.getLastProcessedNotificationID()) {
             // Rest of the notifications need not be processed.
-            LOGGER.error("Received event with Id: {} which is smaller then the 
ID "
-                + "persisted in store", event.getEventId());
-            break;
+            throw e;
           }
-        } else {
-          LOGGER.error("Processing the notification with ID:{} failed with 
exception {}",
-              event.getEventId(), e);
         }
+        sentryStore.persistLastProcessedNotificationID(event.getEventId());
       }
-      if (!isNotificationProcessed) {
-        try {
-          // Update the notification ID in the persistent store even when the 
notification is
-          // not processed as the content in in the notification is not valid.
-          // Continue processing the next notification.
-          LOGGER.debug("Explicitly Persisting Notification ID:{}", 
event.getEventId());
-          sentryStore.persistLastProcessedNotificationID(event.getEventId());
-        } catch (Exception failure) {
-          LOGGER.error("Received exception while persisting the notification 
ID "
-              + event.getEventId());
-          throw failure;
-        }
+      if (isNotificationProcessingSkipped) {
+        // Update the notification ID in the persistent store even when the 
notification is
+        // not processed as the content in in the notification is not valid.
+        // Continue processing the next notification.
+        sentryStore.persistLastProcessedNotificationID(event.getEventId());
+        isNotificationProcessingSkipped = false;
       }
       // Wake up any HMS waiters that are waiting for this ID.
       wakeUpWaitingClientsForSync(event.getEventId());
     }
   }
 
-  /**
-   * Wakes up HMS waiters waiting for a specific event notification.
-   *
-   * @param eventId Id of a notification
-   */
-  private void wakeUpWaitingClientsForSync(long eventId) {
-    CounterWait counterWait = sentryStore.getCounterWait();
+  private void dropSentryDbPrivileges(String dbName, NotificationEvent event) 
throws Exception {
+    try {
+      TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
+      authorizable.setDb(dbName);
+      sentryStore.dropPrivilege(authorizable, 
onDropSentryPrivilege(authorizable));
+    } catch (SentryNoSuchObjectException e) {
+      LOGGER.info("Drop Sentry privilege ignored as there are no privileges on 
the database: %s", dbName);
+    } catch (Exception e) {
+      throw new SentryInvalidInputException("Could not process Drop database 
event." +
+        "Event: " + event.toString(), e);
+    }
+  }
 
-    // Wake up any HMS waiters that are waiting for this ID.
-    // counterWait should never be null, but tests mock SentryStore and a 
mocked one
-    // doesn't have it.
-    if (counterWait != null) {
-      counterWait.update(eventId);
+  private void dropSentryTablePrivileges(String dbName, String tableName, 
NotificationEvent event) throws Exception {
+    try {
+      TSentryAuthorizable authorizable = new TSentryAuthorizable(hiveInstance);
+      authorizable.setDb(dbName);
+      authorizable.setTable(tableName);
+      sentryStore.dropPrivilege(authorizable, 
onDropSentryPrivilege(authorizable));
+    } catch (SentryNoSuchObjectException e) {
+      LOGGER.info("Drop Sentry privilege ignored as there are no privileges on 
the table: %s.%s", dbName, tableName);
+    } catch (Exception e) {
+      throw new SentryInvalidInputException("Could not process Drop table 
event. Event: " + event.toString(), e);
+    }
+  }
+
+  private void renamePrivileges(String oldDbName, String oldTableName, String 
newDbName, String newTableName) throws
+    Exception {
+    TSentryAuthorizable oldAuthorizable = new 
TSentryAuthorizable(hiveInstance);
+    oldAuthorizable.setDb(oldDbName);
+    oldAuthorizable.setTable(oldTableName);
+    TSentryAuthorizable newAuthorizable = new 
TSentryAuthorizable(hiveInstance);
+    newAuthorizable.setDb(newDbName);
+    newAuthorizable.setTable(newTableName);
+    Update update =
+      onRenameSentryPrivilege(oldAuthorizable, newAuthorizable);
+    sentryStore.renamePrivilege(oldAuthorizable, newAuthorizable, update);
+  }
+
+  @VisibleForTesting
+  static Update onDropSentryPrivilege(TSentryAuthorizable authorizable) {
+    PermissionsUpdate update = new 
PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
+    String authzObj = getAuthzObj(authorizable);
+    
update.addPrivilegeUpdate(authzObj).putToDelPrivileges(PermissionsUpdate.ALL_ROLES,
 PermissionsUpdate.ALL_ROLES);
+    return update;
+  }
+
+  @VisibleForTesting
+  static Update onRenameSentryPrivilege(TSentryAuthorizable oldAuthorizable,
+                                        TSentryAuthorizable newAuthorizable)
+    throws SentryPolicyStorePlugin.SentryPluginException {
+    String oldAuthz = getAuthzObj(oldAuthorizable);
+    String newAuthz = getAuthzObj(newAuthorizable);
+    PermissionsUpdate update = new 
PermissionsUpdate(SentryStore.INIT_CHANGE_ID, false);
+    TPrivilegeChanges privUpdate = 
update.addPrivilegeUpdate(PermissionsUpdate.RENAME_PRIVS);
+    privUpdate.putToAddPrivileges(newAuthz, newAuthz);
+    privUpdate.putToDelPrivileges(oldAuthz, oldAuthz);
+    return update;
+  }
+
+  public static String getAuthzObj(TSentryAuthorizable authzble) {
+    String authzObj = null;
+    if (!SentryStore.isNULL(authzble.getDb())) {
+      String dbName = authzble.getDb();
+      String tblName = authzble.getTable();
+      if (SentryStore.isNULL(tblName)) {
+        authzObj = dbName;
+      } else {
+        authzObj = dbName + "." + tblName;
+      }
     }
+    return authzObj == null ? null : authzObj.toLowerCase();
   }
 }

Reply via email to