RANGER-1897: TagSync should replace use of V1 Atlas APIs with V2 APIs for 
efficient tag-download from Atlas


Project: http://git-wip-us.apache.org/repos/asf/ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/e8afb9fa
Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/e8afb9fa
Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/e8afb9fa

Branch: refs/heads/master
Commit: e8afb9faad81e7042877aa528635848a0043cb0c
Parents: c7260c3
Author: Abhay Kulkarni <akulka...@hortonworks.com>
Authored: Thu Dec 7 17:29:41 2017 -0800
Committer: Abhay Kulkarni <akulka...@hortonworks.com>
Committed: Thu Dec 7 17:29:41 2017 -0800

----------------------------------------------------------------------
 .../atlas/authorizer/RangerAtlasResource.java   |   4 +
 pom.xml                                         |   4 +-
 src/main/assembly/tagsync.xml                   |  30 +-
 tagsync/pom.xml                                 |  77 ++---
 .../ranger/tagsync/process/TagSynchronizer.java |   2 +-
 .../tagsync/sink/tagadmin/TagAdminRESTSink.java |  11 +-
 .../source/atlas/AtlasEntityWithTraits.java     |  98 ------
 .../source/atlas/AtlasHbaseResourceMapper.java  |  12 +-
 .../source/atlas/AtlasHdfsResourceMapper.java   |  13 +-
 .../source/atlas/AtlasHiveResourceMapper.java   |  11 +-
 .../source/atlas/AtlasKafkaResourceMapper.java  |  17 +-
 .../source/atlas/AtlasNotificationMapper.java   | 142 ++++----
 .../source/atlas/AtlasResourceMapper.java       |  26 +-
 .../source/atlas/AtlasResourceMapperUtil.java   |  66 ++--
 .../source/atlas/AtlasStormResourceMapper.java  |  10 +-
 .../tagsync/source/atlas/AtlasTagSource.java    |  97 +++---
 .../source/atlasrest/AtlasRESTTagSource.java    | 238 ++++++++++++--
 .../tagsync/source/atlasrest/AtlasRESTUtil.java | 325 -------------------
 .../source/atlasrest/RangerAtlasEntity.java     |  60 ++++
 .../atlasrest/RangerAtlasEntityWithTags.java    | 118 +++++++
 .../process/TestHbaseResourceMapper.java        |  56 ++--
 .../tagsync/process/TestHdfsResourceMapper.java |  24 +-
 .../tagsync/process/TestHiveResourceMapper.java |  28 +-
 .../process/TestKafkaResourceMapper.java        |  16 +-
 24 files changed, 679 insertions(+), 806 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasResource.java
----------------------------------------------------------------------
diff --git 
a/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasResource.java
 
b/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasResource.java
index f056f3e..4367c5e 100644
--- 
a/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasResource.java
+++ 
b/plugin-atlas/src/main/java/org/apache/ranger/authorization/atlas/authorizer/RangerAtlasResource.java
@@ -28,8 +28,10 @@ public class RangerAtlasResource extends 
RangerAccessResourceImpl {
     public static final String KEY_TYPE = "type";
     public static final String KEY_ENTITY = "entity";
     public static final String KEY_OPERATION = "operation";
+    /*
     public static final String KEY_TAXONOMY = "taxonomy";
     public static final String KEY_TERM = "term";
+    */
 
     private static final Logger LOG = 
LoggerFactory.getLogger(RangerAtlasResource.class);
 
@@ -44,12 +46,14 @@ public class RangerAtlasResource extends 
RangerAccessResourceImpl {
             case OPERATION:
                 setValue(KEY_OPERATION, atlasResource);
                 break;
+                /*
             case TAXONOMY:
                 setValue(KEY_TAXONOMY, atlasResource);
                 break;
             case TERM:
                 setValue(KEY_TERM, atlasResource);
                 break;
+                */
             default:
                 LOG.warn("Invalid Resource : " + atlasResource);
                 break;

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a73a52f..3086a1e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -125,9 +125,10 @@
         <apacheds.version>2.0.0-M22</apacheds.version>
         <asm.all.version>3.2</asm.all.version>
         <aspectj.version>1.8.2</aspectj.version>
-        <atlas.version>0.8.1</atlas.version>
+        <atlas.version>1.0.0-SNAPSHOT</atlas.version>
         <atlas.guava.version>14.0</atlas.guava.version>
         <atlas.gson.version>2.5</atlas.gson.version>
+        <atlas.jackson.version>2.9.2</atlas.jackson.version>
         <atlas.jettison.version>1.3.7</atlas.jettison.version>
         <atlas.commons.logging.version>1.1.3</atlas.commons.logging.version>
         <bouncycastle.version>1.55</bouncycastle.version>
@@ -210,6 +211,7 @@
         <springframework.version>3.2.10.RELEASE</springframework.version>
         <sqoop.version>1.99.7</sqoop.version>
         <storm.version>1.0.2</storm.version>
+        <sun-jersey-bundle.version>1.19</sun-jersey-bundle.version>
         <tomcat.embed.version>7.0.82</tomcat.embed.version>
         <velocity.version>1.7</velocity.version>
         <zookeeper.version>3.4.6</zookeeper.version>

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/src/main/assembly/tagsync.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/tagsync.xml b/src/main/assembly/tagsync.xml
index 0b17151..c929395 100644
--- a/src/main/assembly/tagsync.xml
+++ b/src/main/assembly/tagsync.xml
@@ -37,14 +37,13 @@
                                        <include>com.101tec:zkclient</include>
                                        
<include>com.google.code.gson:gson:jar:${gson.version}</include>
                                        
<include>com.google.guava:guava:jar:${google.guava.version}</include>
-                                       
<include>com.google.inject:guice:jar:${guice.version}</include>
-                                       
<include>com.google.inject.extensions:guice-multibindings:jar:${guice.version}</include>
                                        
<include>com.sun.jersey:jersey-bundle:jar:${jersey-bundle.version}</include>
-                                       
<include>com.thoughtworks.paranamer:paranamer:jar:${paranamer.version}</include>
-                                       
<include>com.yammer.metrics:metrics-core</include>
+                                       
<include>com.sun.jersey.contribs:jersey-multipart:jar:${sun-jersey-bundle.version}</include>
                                        
<include>org.apache.atlas:atlas-notification:jar:${atlas.version}</include>
-                                       
<include>org.apache.atlas:atlas-typesystem:jar:${atlas.version}</include>
-                                       
<include>org.apache.atlas:atlas-client:jar:${atlas.version}</include>
+                                       
<include>org.apache.atlas:atlas-intg:jar:${atlas.version}</include>
+                                       
<include>org.apache.atlas:atlas-client-v1:jar:${atlas.version}</include>
+                                       
<include>org.apache.atlas:atlas-client-v2:jar:${atlas.version}</include>
+                                       
<include>org.apache.atlas:atlas-client-common:jar:${atlas.version}</include>
                                        
<include>org.apache.atlas:atlas-common:jar:${atlas.version}</include>
                                        
<include>org.apache.hadoop:hadoop-auth</include>
                                        
<include>org.apache.hadoop:hadoop-common</include>
@@ -55,20 +54,15 @@
                                        
<include>org.apache.ranger:ranger-plugins-common</include>
                                        
<include>org.apache.ranger:ranger-util</include>
                                        
<include>org.apache.zookeeper:zookeeper:jar:${zookeeper.version}</include>
-                                       
<include>org.codehaus.jackson:jackson-core-asl</include>
-                                       
<include>org.codehaus.jackson:jackson-jaxrs</include>
-                                       
<include>org.codehaus.jackson:jackson-mapper-asl</include>
-                                       
<include>org.codehaus.jackson:jackson-xc</include>
+                                       
<include>com.fasterxml.jackson.core:jackson-annotations:jar:${atlas.jackson.version}</include>
+                                       
<include>com.fasterxml.jackson.core:jackson-core:jar:${atlas.jackson.version}</include>
+                                       
<include>com.fasterxml.jackson.core:jackson-databind:jar:${atlas.jackson.version}</include>
+                                       
<include>com.fasterxml.jackson.jaxrs:jackson-jaxrs-base:jar:${atlas.jackson.version}</include>
+                                       
<include>com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:jar:${atlas.jackson.version}</include>
+                                       
<include>org.codehaus.jackson:jackson-core-asl:jar:${codehaus.jackson.version}</include>
+                                       
<include>org.codehaus.jackson:jackson-jaxrs:jar:${codehaus.jackson.version}</include>
                                        
<include>org.codehaus.jettison:jettison:jar:${jettison.version}</include>
-                                       
<include>org.json4s:json4s-native_${scala.binary.version}:jar:${json4s.version}</include>
-                                       
<include>org.json4s:json4s-core_${scala.binary.version}:jar:${json4s.version}</include>
-                                       
<include>org.json4s:json4s-ast_${scala.binary.version}:jar:${json4s.version}</include>
                                        
<include>org.scala-lang:scala-library:jar:${scala.version}</include>
-                                       
<include>org.scala-lang:scalap:jar:${scala.version}</include>
-                                       
<include>org.scala-lang:scala-compiler:jar:${scala.version}</include>
-                                       
<include>org.scala-lang:scala-reflect:jar:${scala.version}</include>
-                                       
<include>org.scala-lang.modules:scala-xml_${scala.binary.version}:jar:${scala.xml.version}</include>
-                                       
<include>org.scala-lang.modules:scala-parser-combinators_${scala.binary.version}:jar:${scala.xml.version}</include>
                                        <include>org.slf4j:slf4j-api</include>
                                        
<include>aopalliance:aopalliance:jar:${aopalliance.version}</include>
                                        
<include>commons-cli:commons-cli:jar:${commons.cli.version}</include>

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/pom.xml
----------------------------------------------------------------------
diff --git a/tagsync/pom.xml b/tagsync/pom.xml
index 74ff155..7e53641 100644
--- a/tagsync/pom.xml
+++ b/tagsync/pom.xml
@@ -55,6 +55,11 @@
             <version>${jersey-bundle.version}</version>
         </dependency>
         <dependency>
+            <groupId>com.sun.jersey.contribs</groupId>
+            <artifactId>jersey-multipart</artifactId>
+            <version>${sun-jersey-bundle.version}</version>
+        </dependency>
+        <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
             <version>${commons.cli.version}</version>
@@ -95,14 +100,14 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>com.google.inject</groupId>
-            <artifactId>guice</artifactId>
-            <version>${guice.version}</version>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-core-asl</artifactId>
+            <version>${codehaus.jackson.version}</version>
         </dependency>
         <dependency>
-            <groupId>com.google.inject.extensions</groupId>
-            <artifactId>guice-multibindings</artifactId>
-            <version>${guice.version}</version>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-jaxrs</artifactId>
+            <version>${codehaus.jackson.version}</version>
         </dependency>
         <dependency>
             <groupId>org.codehaus.jettison</groupId>
@@ -116,12 +121,17 @@
         </dependency>
         <dependency>
             <groupId>org.apache.atlas</groupId>
-            <artifactId>atlas-typesystem</artifactId>
+            <artifactId>atlas-intg</artifactId>
+            <version>${atlas.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.atlas</groupId>
+            <artifactId>atlas-client-v1</artifactId>
             <version>${atlas.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.atlas</groupId>
-            <artifactId>atlas-client</artifactId>
+            <artifactId>atlas-client-v2</artifactId>
             <version>${atlas.version}</version>
         </dependency>
         <dependency>
@@ -160,53 +170,34 @@
         </dependency>
         <dependency>
             <groupId>org.scala-lang</groupId>
-            <artifactId>scala-compiler</artifactId>
-            <version>${scala.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
             <version>${scala.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scalap</artifactId>
-            <version>${scala.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.scala-lang</groupId>
-            <artifactId>scala-reflect</artifactId>
-            <version>${scala.version}</version>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${atlas.jackson.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.scala-lang.modules</groupId>
-            <artifactId>scala-xml_${scala.binary.version}</artifactId>
-            <version>${scala.xml.version}</version>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${atlas.jackson.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.scala-lang.modules</groupId>
-            
<artifactId>scala-parser-combinators_${scala.binary.version}</artifactId>
-            <version>${scala.xml.version}</version>
+            <groupId>com.fasterxml.jackson.jaxrs</groupId>
+            <artifactId>jackson-jaxrs-base</artifactId>
+            <version>${atlas.jackson.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.json4s</groupId>
-            <artifactId>json4s-core_${scala.binary.version}</artifactId>
-            <version>${json4s.version}</version>
+            <groupId>com.fasterxml.jackson.jaxrs</groupId>
+            <artifactId>jackson-jaxrs-json-provider</artifactId>
+            <version>${atlas.jackson.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.json4s</groupId>
-            <artifactId>json4s-native_${scala.binary.version}</artifactId>
-            <version>${json4s.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.json4s</groupId>
-            <artifactId>json4s-ast_${scala.binary.version}</artifactId>
-            <version>${json4s.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.thoughtworks.paranamer</groupId>
-            <artifactId>paranamer</artifactId>
-            <version>${paranamer.version}</version>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>${atlas.jackson.version}</version>
         </dependency>
+
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
index b07cd34..45997e4 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/process/TagSynchronizer.java
@@ -107,7 +107,7 @@ public class TagSynchronizer {
 
                if (ret) {
                        LOG.info("Initializing TAG source and sink");
-
+                       ret = false;
                        tagSink = initializeTagSink(properties);
 
                        if (tagSink != null) {

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
index c34b6ea..a1dc8f5 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/sink/tagadmin/TagAdminRESTSink.java
@@ -65,13 +65,13 @@ public class TagAdminRESTSink implements TagSink, Runnable {
 
        @Override
        public boolean initialize(Properties properties) {
-               if(LOG.isDebugEnabled()) {
+               if (LOG.isDebugEnabled()) {
                        LOG.debug("==> TagAdminRESTSink.initialize()");
                }
 
                boolean ret = false;
 
-               String restUrl       = 
TagSyncConfig.getTagAdminRESTUrl(properties);
+               String restUrl = TagSyncConfig.getTagAdminRESTUrl(properties);
                String sslConfigFile = 
TagSyncConfig.getTagAdminRESTSslConfigFile(properties);
                String userName = TagSyncConfig.getTagAdminUserName(properties);
                String password = TagSyncConfig.getTagAdminPassword(properties);
@@ -89,16 +89,19 @@ public class TagAdminRESTSink implements TagSink, Runnable {
 
                if (StringUtils.isNotBlank(restUrl)) {
                        tagRESTClient = new RangerRESTClient(restUrl, 
sslConfigFile);
-                       if(!isKerberized) {
+                       if (!isKerberized) {
                                tagRESTClient.setBasicAuthInfo(userName, 
password);
                        }
+                       // Build and cache REST client. This will catch any 
errors in building REST client up-front
+                       tagRESTClient.getClient();
+
                        uploadWorkItems = new 
LinkedBlockingQueue<UploadWorkItem>();
                        ret = true;
                } else {
                        LOG.error("No value specified for property 
'ranger.tagsync.tagadmin.rest.url'!");
                }
 
-               if(LOG.isDebugEnabled()) {
+               if (LOG.isDebugEnabled()) {
                        LOG.debug("<== TagAdminRESTSink.initialize(), result=" 
+ ret);
                }
 

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java
deleted file mode 100644
index 77dee01..0000000
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasEntityWithTraits.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.tagsync.source.atlas;
-
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class AtlasEntityWithTraits {
-
-       private final IReferenceableInstance entity;
-       private final List<IStruct> traits;
-
-       public AtlasEntityWithTraits(IReferenceableInstance entity, 
List<IStruct> traits) {
-               this.entity = entity;
-               this.traits = traits;
-       }
-
-       public IReferenceableInstance getEntity() {
-               return entity;
-       }
-
-       public List<IStruct> getAllTraits() {
-               return traits == null ? new LinkedList<IStruct>() : traits;
-       }
-
-       @Override
-       public String toString( ) {
-               StringBuilder sb = new StringBuilder();
-
-               toString(sb);
-
-               return sb.toString();
-       }
-
-       public void toString(StringBuilder sb) {
-
-               sb.append("AtlasEntityWithTraits={ ");
-
-               sb.append("Entity-Id: " + entity.getId()._getId()).append(", ")
-                               .append("Entity-Type: " + 
entity.getTypeName()).append(", ")
-                               .append("Entity-Version: " + 
entity.getId().getVersion()).append(", ")
-                               .append("Entity-State: " + 
entity.getId().getStateAsString()).append(", ");
-
-               sb.append("Entity-Values={ ");
-               try {
-                       for (Map.Entry<String, Object> entry : 
entity.getValuesMap().entrySet()) {
-                               sb.append("{").append(entry.getKey()).append(", 
").append(entry.getValue()).append("}, ");
-                       }
-               } catch (AtlasException exception) {
-                               // Ignore
-               }
-               sb.append(" }");
-
-               sb.append(", Entity-Traits={ ");
-               for (IStruct trait : traits) {
-                       try {
-                               
sb.append("{traitType=").append(trait.getTypeName()).append(", ");
-                               Map<String, Object> traitValues = 
trait.getValuesMap();
-                               sb.append("{");
-                               for (Map.Entry<String, Object> valueEntry : 
traitValues.entrySet()) {
-                                       
sb.append("{").append(valueEntry.getKey()).append(", 
").append(valueEntry.getValue()).append("}");
-                               }
-                               sb.append("}");
-
-                               sb.append(" }");
-                       } catch (AtlasException exception) {
-                               // Ignore
-                       }
-               }
-               sb.append(" }");
-
-               sb.append(" }");
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
index 8b36a31..33e804a 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHbaseResourceMapper.java
@@ -22,10 +22,10 @@ package org.apache.ranger.tagsync.source.atlas;
 import java.util.Map;
 import java.util.HashMap;
 
-import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
 import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
 
 public class AtlasHbaseResourceMapper extends AtlasResourceMapper {
        public static final String ENTITY_TYPE_HBASE_TABLE          = 
"hbase_table";
@@ -36,10 +36,6 @@ public class AtlasHbaseResourceMapper extends 
AtlasResourceMapper {
        public static final String RANGER_TYPE_HBASE_COLUMN_FAMILY  = 
"column-family";
        public static final String RANGER_TYPE_HBASE_COLUMN         = "column";
 
-       public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME  = 
"qualifiedName";
-       public static final String QUALIFIED_NAME_DELIMITER         = "\\.";
-       public static final Character QUALIFIED_NAME_DELIMITER_CHAR    = '.';
-
        public static final String[] SUPPORTED_ENTITY_TYPES = { 
ENTITY_TYPE_HBASE_TABLE, ENTITY_TYPE_HBASE_COLUMN_FAMILY, 
ENTITY_TYPE_HBASE_COLUMN };
 
        public AtlasHbaseResourceMapper() {
@@ -47,8 +43,8 @@ public class AtlasHbaseResourceMapper extends 
AtlasResourceMapper {
        }
 
        @Override
-       public RangerServiceResource buildResource(final IReferenceableInstance 
entity) throws Exception {
-               String qualifiedName = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+       public RangerServiceResource buildResource(final RangerAtlasEntity 
entity) throws Exception {
+               String qualifiedName = 
(String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME);
                if (StringUtils.isEmpty(qualifiedName)) {
                        throw new Exception("attribute '" +  
ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity");
                }
@@ -64,7 +60,7 @@ public class AtlasHbaseResourceMapper extends 
AtlasResourceMapper {
                }
 
                String entityType  = entity.getTypeName();
-               String entityGuid  = entity.getId() != null ? 
entity.getId()._getId() : null;
+               String entityGuid  = entity.getGuid();
                String serviceName = getRangerServiceName(clusterName);
 
                Map<String, RangerPolicyResource> elements = new 
HashMap<String, RangerPolicyResource>();

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
index 06bff90..378542c 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHdfsResourceMapper.java
@@ -22,12 +22,12 @@ package org.apache.ranger.tagsync.source.atlas;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.ranger.plugin.model.RangerPolicy;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
 import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
 
 public class AtlasHdfsResourceMapper extends AtlasResourceMapper {
        public static final String ENTITY_TYPE_HDFS_PATH = "hdfs_path";
@@ -35,7 +35,6 @@ public class AtlasHdfsResourceMapper extends 
AtlasResourceMapper {
 
        public static final String ENTITY_ATTRIBUTE_PATH           = "path";
        public static final String ENTITY_ATTRIBUTE_CLUSTER_NAME   = 
"clusterName";
-       public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = 
"qualifiedName";
 
        public static final String[] SUPPORTED_ENTITY_TYPES = { 
ENTITY_TYPE_HDFS_PATH };
 
@@ -56,10 +55,10 @@ public class AtlasHdfsResourceMapper extends 
AtlasResourceMapper {
        }
 
        @Override
-       public RangerServiceResource buildResource(final IReferenceableInstance 
entity) throws Exception {
-               String path          = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_PATH, String.class);
-               String clusterName   = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_CLUSTER_NAME, String.class);
-               String qualifiedName = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+       public RangerServiceResource buildResource(final RangerAtlasEntity 
entity) throws Exception {
+               String path          = 
(String)entity.getAttributes().get(ENTITY_ATTRIBUTE_PATH);
+               String clusterName   = 
(String)entity.getAttributes().get(ENTITY_ATTRIBUTE_CLUSTER_NAME);
+               String qualifiedName = 
(String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME);
 
                if(StringUtils.isEmpty(path)) {
                        path = getResourceNameFromQualifiedName(qualifiedName);
@@ -81,7 +80,7 @@ public class AtlasHdfsResourceMapper extends 
AtlasResourceMapper {
                        }
                }
 
-               String  entityGuid  = entity.getId() != null ? 
entity.getId()._getId() : null;
+               String  entityGuid  = entity.getGuid();
                String  serviceName = getRangerServiceName(clusterName);
                Boolean isExcludes  = Boolean.FALSE;
                Boolean isRecursive = Boolean.TRUE;

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
index a359622..3e0a97f 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasHiveResourceMapper.java
@@ -22,10 +22,10 @@ package org.apache.ranger.tagsync.source.atlas;
 import java.util.Map;
 import java.util.HashMap;
 
-import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
 import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
 
 public class AtlasHiveResourceMapper extends AtlasResourceMapper {
        public static final String ENTITY_TYPE_HIVE_DB     = "hive_db";
@@ -36,9 +36,6 @@ public class AtlasHiveResourceMapper extends 
AtlasResourceMapper {
        public static final String RANGER_TYPE_HIVE_TABLE  = "table";
        public static final String RANGER_TYPE_HIVE_COLUMN = "column";
 
-       public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = 
"qualifiedName";
-       public static final String QUALIFIED_NAME_DELIMITER        = "\\.";
-
        public static final String[] SUPPORTED_ENTITY_TYPES = { 
ENTITY_TYPE_HIVE_DB, ENTITY_TYPE_HIVE_TABLE, ENTITY_TYPE_HIVE_COLUMN };
 
        public AtlasHiveResourceMapper() {
@@ -46,8 +43,8 @@ public class AtlasHiveResourceMapper extends 
AtlasResourceMapper {
        }
 
        @Override
-       public RangerServiceResource buildResource(final IReferenceableInstance 
entity) throws Exception {
-               String qualifiedName = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+       public RangerServiceResource buildResource(final RangerAtlasEntity 
entity) throws Exception {
+               String qualifiedName = 
(String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME);
                if (StringUtils.isEmpty(qualifiedName)) {
                        throw new Exception("attribute '" +  
ENTITY_ATTRIBUTE_QUALIFIED_NAME + "' not found in entity");
                }
@@ -63,7 +60,7 @@ public class AtlasHiveResourceMapper extends 
AtlasResourceMapper {
                }
 
                String   entityType  = entity.getTypeName();
-               String   entityGuid  = entity.getId() != null ? 
entity.getId()._getId() : null;
+               String   entityGuid  = entity.getGuid();
                String   serviceName = getRangerServiceName(clusterName);
                String[] resources   = 
resourceStr.split(QUALIFIED_NAME_DELIMITER);
                String   dbName      = resources.length > 0 ? resources[0] : 
null;

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
index 09ae5d1..86e37c3 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasKafkaResourceMapper.java
@@ -22,18 +22,16 @@ package org.apache.ranger.tagsync.source.atlas;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.plugin.model.RangerPolicy;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
 import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
 
 public class AtlasKafkaResourceMapper extends AtlasResourceMapper {
        public static final String ENTITY_TYPE_KAFKA_TOPIC = "kafka_topic";
        public static final String RANGER_TYPE_KAFKA_TOPIC = "topic";
 
-       public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = 
"qualifiedName";
-
        public static final String[] SUPPORTED_ENTITY_TYPES = { 
ENTITY_TYPE_KAFKA_TOPIC };
 
        public AtlasKafkaResourceMapper() {
@@ -41,12 +39,8 @@ public class AtlasKafkaResourceMapper extends 
AtlasResourceMapper {
        }
 
        @Override
-       public RangerServiceResource buildResource(final IReferenceableInstance 
entity) throws Exception {
-               String qualifiedName = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
-
-               if(StringUtils.isEmpty(qualifiedName)) {
-                       throwExceptionWithMessage("attribute '" + 
ENTITY_ATTRIBUTE_QUALIFIED_NAME +  "' not found in entity");
-               }
+       public RangerServiceResource buildResource(final RangerAtlasEntity 
entity) throws Exception {
+               String qualifiedName = 
(String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME);
 
                String topic = getResourceNameFromQualifiedName(qualifiedName);
 
@@ -61,16 +55,17 @@ public class AtlasKafkaResourceMapper extends 
AtlasResourceMapper {
                }
 
                if(StringUtils.isEmpty(clusterName)) {
-                       throwExceptionWithMessage("Cluster name not found in 
attribute '" + ENTITY_ATTRIBUTE_QUALIFIED_NAME +  "'");
+                       throwExceptionWithMessage("attribute '" + 
ENTITY_ATTRIBUTE_QUALIFIED_NAME +  "' not found in entity");
                }
 
+
                Map<String, RangerPolicyResource> elements = new 
HashMap<String, RangerPolicy.RangerPolicyResource>();
                Boolean isExcludes  = Boolean.FALSE;
                Boolean isRecursive = Boolean.TRUE;
 
                elements.put(RANGER_TYPE_KAFKA_TOPIC, new 
RangerPolicyResource(topic, isExcludes, isRecursive));
 
-               String  entityGuid  = entity.getId() != null ? 
entity.getId()._getId() : null;
+               String  entityGuid  = entity.getGuid();
                String  serviceName = getRangerServiceName(clusterName);
 
                return new RangerServiceResource(entityGuid, serviceName, 
elements);

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index f007ae5..91cf606 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -19,11 +19,9 @@
 
 package org.apache.ranger.tagsync.source.atlas;
 
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.notification.entity.EntityNotification;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.persistence.Id;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
+import org.apache.atlas.v1.model.instance.Id;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
@@ -34,9 +32,10 @@ import org.apache.ranger.plugin.model.RangerTag;
 import org.apache.ranger.plugin.model.RangerTagDef;
 import org.apache.ranger.plugin.model.RangerTagDef.RangerTagAttributeDef;
 import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntityWithTags;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -47,7 +46,7 @@ public class AtlasNotificationMapper {
 
        private static Map<String, Long> unhandledEventTypes = new 
HashMap<String, Long>();
 
-       private static void logUnhandledEntityNotification(EntityNotification 
entityNotification) {
+       private static void logUnhandledEntityNotification(EntityNotificationV1 
entityNotification) {
 
                final int REPORTING_INTERVAL_FOR_UNHANDLED_ENTITYTYPE_IN_MILLIS 
= 5 * 60 * 1000; // 5 minutes
 
@@ -77,39 +76,30 @@ public class AtlasNotificationMapper {
        }
 
        @SuppressWarnings("unchecked")
-       public static ServiceTags processEntityNotification(EntityNotification 
entityNotification) {
+       public static ServiceTags 
processEntityNotification(EntityNotificationV1 entityNotification) {
 
                ServiceTags ret = null;
 
                if (isNotificationHandled(entityNotification)) {
                        try {
-                               IReferenceableInstance entity = 
entityNotification.getEntity();
-
-                               if (entity != null && 
AtlasResourceMapperUtil.isEntityTypeHandled(entity.getTypeName())) {
-                                       AtlasEntityWithTraits entityWithTraits 
= new AtlasEntityWithTraits(entity, entityNotification.getAllTraits());
-                                       if 
(entityNotification.getOperationType() == 
EntityNotification.OperationType.ENTITY_DELETE) {
-                                               ret = 
buildServiceTagsForEntityDeleteNotification(entityWithTraits);
-                                       } else {
-                                               if (entity.getId().getState() 
== Id.EntityState.ACTIVE) {
-                                                       ret = 
buildServiceTags(entityWithTraits, null);
-                                               } else {
-                                                       if 
(LOG.isDebugEnabled()) {
-                                                               
LOG.debug("Ignoring entityNotification for entity that is not ACTIVE: " + 
entityWithTraits);
-                                                       }
-                                               }
-                                       }
+                               RangerAtlasEntityWithTags entityWithTags = new 
RangerAtlasEntityWithTags(entityNotification);
+
+                               if (entityNotification.getOperationType() == 
EntityNotificationV1.OperationType.ENTITY_DELETE) {
+                                       ret = 
buildServiceTagsForEntityDeleteNotification(entityWithTags);
                                } else {
-                                       
logUnhandledEntityNotification(entityNotification);
+                                       ret = buildServiceTags(entityWithTags, 
null);
                                }
 
                        } catch (Exception exception) {
                                LOG.error("createServiceTags() failed!! ", 
exception);
                        }
+               } else {
+                       logUnhandledEntityNotification(entityNotification);
                }
                return ret;
        }
 
-       public static Map<String, ServiceTags> 
processAtlasEntities(List<AtlasEntityWithTraits> atlasEntities) {
+       public static Map<String, ServiceTags> 
processAtlasEntities(List<RangerAtlasEntityWithTags> atlasEntities) {
                Map<String, ServiceTags> ret = null;
 
                try {
@@ -121,17 +111,16 @@ public class AtlasNotificationMapper {
                return ret;
        }
 
-       static private boolean isNotificationHandled(EntityNotification 
entityNotification) {
+       static private boolean isNotificationHandled(EntityNotificationV1 
entityNotification) {
                boolean ret = false;
 
-               EntityNotification.OperationType opType = 
entityNotification.getOperationType();
+               EntityNotificationV1.OperationType opType = 
entityNotification.getOperationType();
 
-               if(opType != null) {
+               if (opType != null) {
                        switch (opType) {
-                               case ENTITY_CREATE: {
-                                       LOG.debug("ENTITY_CREATE notification 
is not handled, as Ranger will get necessary information from any subsequent 
TRAIT_ADDED notification");
+                               case ENTITY_CREATE:
+                                       ret = 
CollectionUtils.isNotEmpty(entityNotification.getAllTraits());
                                        break;
-                               }
                                case ENTITY_UPDATE:
                                case ENTITY_DELETE:
                                case TRAIT_ADD:
@@ -142,30 +131,38 @@ public class AtlasNotificationMapper {
                                }
                                default:
                                        LOG.error(opType + ": unknown 
notification received - not handled");
+                                       break;
+                       }
+                       if (ret) {
+                               final Referenceable entity = 
entityNotification.getEntity();
+
+                               ret = entity != null
+                                               && entity.getId().getState() == 
Id.EntityState.ACTIVE
+                                               && 
AtlasResourceMapperUtil.isEntityTypeHandled(entity.getTypeName());
                        }
                }
 
                return ret;
        }
 
-       static private ServiceTags 
buildServiceTagsForEntityDeleteNotification(AtlasEntityWithTraits 
entityWithTraits) throws Exception {
+       static private ServiceTags 
buildServiceTagsForEntityDeleteNotification(RangerAtlasEntityWithTags 
entityWithTags) throws Exception {
                final ServiceTags ret;
 
-               IReferenceableInstance entity = entityWithTraits.getEntity();
+               RangerAtlasEntity entity = entityWithTags.getEntity();
 
-               String guid = entity.getId()._getId();
+               String guid = entity.getGuid();
                if (StringUtils.isNotBlank(guid)) {
                        ret = new ServiceTags();
                        RangerServiceResource serviceResource = new 
RangerServiceResource();
                        serviceResource.setGuid(guid);
                        ret.getServiceResources().add(serviceResource);
                } else {
-                       ret = buildServiceTags(entityWithTraits, null);
+                       ret = buildServiceTags(entityWithTags, null);
                        if (ret != null) {
                                // tag-definitions should NOT be deleted as 
part of service-resource delete
-                               ret.setTagDefinitions(Collections.<Long, 
RangerTagDef>emptyMap());
+                               ret.setTagDefinitions(MapUtils.EMPTY_MAP);
                                // Ranger deletes tags associated with deleted 
service-resource
-                               ret.setTags(Collections.<Long, 
RangerTag>emptyMap());
+                               ret.setTags(MapUtils.EMPTY_MAP);
                        }
                }
 
@@ -176,13 +173,13 @@ public class AtlasNotificationMapper {
                return ret;
        }
 
-       static private Map<String, ServiceTags> 
buildServiceTags(List<AtlasEntityWithTraits> entitiesWithTraits) throws 
Exception {
+       static private Map<String, ServiceTags> 
buildServiceTags(List<RangerAtlasEntityWithTags> entitiesWithTags) throws 
Exception {
 
                Map<String, ServiceTags> ret = new HashMap<String, 
ServiceTags>();
 
-               for (AtlasEntityWithTraits element : entitiesWithTraits) {
-                       IReferenceableInstance entity = element.getEntity();
-                       if (entity != null && entity.getId().getState() == 
Id.EntityState.ACTIVE) {
+               for (RangerAtlasEntityWithTags element : entitiesWithTags) {
+                       RangerAtlasEntity entity = element.getEntity();
+                       if (entity != null) {
                                buildServiceTags(element, ret);
                        } else {
                                if (LOG.isDebugEnabled()) {
@@ -241,15 +238,15 @@ public class AtlasNotificationMapper {
                return ret;
        }
 
-       static private ServiceTags buildServiceTags(AtlasEntityWithTraits 
entityWithTraits, Map<String, ServiceTags> serviceTagsMap) throws Exception {
+       static private ServiceTags buildServiceTags(RangerAtlasEntityWithTags 
entityWithTags, Map<String, ServiceTags> serviceTagsMap) throws Exception {
                ServiceTags            ret             = null;
-               IReferenceableInstance entity          = 
entityWithTraits.getEntity();
+               RangerAtlasEntity entity          = entityWithTags.getEntity();
                RangerServiceResource  serviceResource = 
AtlasResourceMapperUtil.getRangerServiceResource(entity);
 
                if (serviceResource != null) {
 
-                       List<RangerTag>     tags        = 
getTags(entityWithTraits);
-                       List<RangerTagDef>  tagDefs     = 
getTagDefs(entityWithTraits);
+                       List<RangerTag>     tags        = 
getTags(entityWithTags);
+                       List<RangerTagDef>  tagDefs     = 
getTagDefs(entityWithTags);
                        String              serviceName = 
serviceResource.getServiceName();
 
                        ret = createOrGetServiceTags(serviceTagsMap, 
serviceName);
@@ -279,12 +276,12 @@ public class AtlasNotificationMapper {
                                }
                        } else {
                                if (LOG.isDebugEnabled()) {
-                                       LOG.debug("Entity " + entityWithTraits 
+ " does not have any tags associated with it when full-sync is being done.");
+                                       LOG.debug("Entity " + entityWithTags + 
" does not have any tags associated with it when full-sync is being done.");
                                        LOG.debug("Will not add this entity to 
serviceTags, so that this entity, if exists,  will be removed from ranger");
                                }
                        }
                } else {
-                       LOG.error("Failed to build serviceResource for entity:" 
+ entity.getId()._getId());
+                       LOG.error("Failed to build serviceResource for entity:" 
+ entity.getGuid());
                }
 
                return ret;
@@ -307,58 +304,33 @@ public class AtlasNotificationMapper {
                return ret;
        }
 
-       static private List<RangerTag> getTags(AtlasEntityWithTraits 
entityWithTraits) {
+       static private List<RangerTag> getTags(RangerAtlasEntityWithTags 
entityWithTags) {
                List<RangerTag> ret = new ArrayList<RangerTag>();
 
-               if(entityWithTraits != null && 
CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
-                       List<IStruct> traits = entityWithTraits.getAllTraits();
-
-                       for (IStruct trait : traits) {
-                               Map<String, String> tagAttrs = new 
HashMap<String, String>();
+               if (entityWithTags != null && 
MapUtils.isNotEmpty(entityWithTags.getTags())) {
+                       Map<String, Map<String, String>> tags = 
entityWithTags.getTags();
 
-                               try {
-                                       Map<String, Object> attrs = 
trait.getValuesMap();
-
-                                       if(MapUtils.isNotEmpty(attrs)) {
-                                               for (Map.Entry<String, Object> 
attrEntry : attrs.entrySet()) {
-                                                       String attrName  = 
attrEntry.getKey();
-                                                       Object attrValue = 
attrEntry.getValue();
-
-                                                       tagAttrs.put(attrName, 
attrValue != null ? attrValue.toString() : null);
-                                               }
-                                       }
-                               } catch (AtlasException exception) {
-                                       LOG.error("Could not get values for 
trait:" + trait.getTypeName(), exception);
-                               }
-
-                               ret.add(new RangerTag(null, 
trait.getTypeName(), tagAttrs, RangerTag.OWNER_SERVICERESOURCE));
+                       for (Map.Entry<String, Map<String, String>> tag : 
tags.entrySet()) {
+                               ret.add(new RangerTag(null, tag.getKey(), 
tag.getValue(), RangerTag.OWNER_SERVICERESOURCE));
                        }
                }
 
                return ret;
        }
 
-       static private List<RangerTagDef> getTagDefs(AtlasEntityWithTraits 
entityWithTraits) {
+       static private List<RangerTagDef> getTagDefs(RangerAtlasEntityWithTags 
entityWithTags) {
                List<RangerTagDef> ret = new ArrayList<RangerTagDef>();
 
-               if(entityWithTraits != null && 
CollectionUtils.isNotEmpty(entityWithTraits.getAllTraits())) {
-                       List<IStruct> traits = entityWithTraits.getAllTraits();
-
-                       for (IStruct trait : traits) {
-                               RangerTagDef tagDef = new 
RangerTagDef(trait.getTypeName(), "Atlas");
-
-                               try {
-                                       Map<String, Object> attrs = 
trait.getValuesMap();
+               if (entityWithTags != null && 
MapUtils.isNotEmpty(entityWithTags.getTags())) {
+                       Map<String, Map<String, String>> tags = 
entityWithTags.getTags();
 
-                                       if(MapUtils.isNotEmpty(attrs)) {
-                                               for (String attrName : 
attrs.keySet()) {
-                                                       
tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attrName, "string"));
-                                               }
+                       for (Map.Entry<String, Map<String, String>> tag : 
tags.entrySet()) {
+                               RangerTagDef tagDef = new 
RangerTagDef(tag.getKey(), "Atlas");
+                               if (MapUtils.isNotEmpty(tag.getValue())) {
+                                       for (String attributeName : 
tag.getValue().keySet()) {
+                                               
tagDef.getAttributeDefs().add(new RangerTagAttributeDef(attributeName, 
entityWithTags.getTagAttributeType(tag.getKey(), attributeName)));
                                        }
-                               } catch (AtlasException exception) {
-                                       LOG.error("Could not get values for 
trait:" + trait.getTypeName(), exception);
                                }
-
                                ret.add(tagDef);
                        }
                }

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
index 8ececdf..5d067a5 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapper.java
@@ -20,19 +20,20 @@
 package org.apache.ranger.tagsync.source.atlas;
 
 import java.util.Properties;
-import java.util.Map;
 
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
 
 public abstract class AtlasResourceMapper {
        private static final Log LOG = 
LogFactory.getLog(AtlasResourceMapper.class);
 
        public static final String TAGSYNC_DEFAULT_CLUSTER_NAME = 
"ranger.tagsync.atlas.default.cluster.name";
+       public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = 
"qualifiedName";
+       public static final String QUALIFIED_NAME_DELIMITER        = "\\.";
+       public static final Character QUALIFIED_NAME_DELIMITER_CHAR    = '.';
 
        protected static final String TAGSYNC_SERVICENAME_MAPPER_PROP_PREFIX    
              = "ranger.tagsync.atlas.";
        protected static final String TAGSYNC_SERVICENAME_MAPPER_PROP_SUFFIX    
              = ".ranger.service";
@@ -73,7 +74,7 @@ public abstract class AtlasResourceMapper {
                this.defaultClusterName = properties != null ? 
properties.getProperty(TAGSYNC_DEFAULT_CLUSTER_NAME) : null;
        }
 
-       abstract public RangerServiceResource buildResource(final 
IReferenceableInstance entity) throws Exception;
+       abstract public RangerServiceResource buildResource(final 
RangerAtlasEntity entity) throws Exception;
 
        protected String getCustomRangerServiceName(String atlasInstanceName) {
                if(properties != null) {
@@ -118,21 +119,4 @@ public abstract class AtlasResourceMapper {
 
                throw new Exception(msg);
        }
-
-       static protected <T> T getEntityAttribute(IReferenceableInstance 
entity, String name, Class<T> type) {
-               T ret = null;
-
-               try {
-                       Map<String, Object> valueMap = entity.getValuesMap();
-                       ret = getAttribute(valueMap, name, type);
-               } catch (AtlasException exception) {
-                       LOG.error("Cannot get map of values for entity: " + 
entity.getId()._getId(), exception);
-               }
-
-               return ret;
-       }
-
-       static protected <T> T getAttribute(Map<String, Object> map, String 
name, Class<T> type) {
-               return type.cast(map.get(name));
-       }
 }

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
index 40a639b..cd2cb63 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasResourceMapperUtil.java
@@ -19,7 +19,6 @@
 
 package org.apache.ranger.tagsync.source.atlas;
 
-import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.plugin.model.RangerServiceResource;
 
@@ -28,14 +27,13 @@ import java.util.*;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.tagsync.process.TagSyncConfig;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
 
 public class AtlasResourceMapperUtil {
        private static final Log LOG = 
LogFactory.getLog(AtlasResourceMapperUtil.class);
 
        private static Map<String, AtlasResourceMapper> atlasResourceMappers = 
new HashMap<String, AtlasResourceMapper>();
 
-       private static final String MAPPER_NAME_DELIMITER = ",";
-
        public static boolean isEntityTypeHandled(String entityTypeName) {
                if (LOG.isDebugEnabled()) {
                        LOG.debug("==> isEntityTypeHandled(entityTypeName=" + 
entityTypeName + ")");
@@ -52,9 +50,9 @@ public class AtlasResourceMapperUtil {
                return ret;
        }
 
-       public static RangerServiceResource 
getRangerServiceResource(IReferenceableInstance atlasEntity) {
+       public static RangerServiceResource 
getRangerServiceResource(RangerAtlasEntity atlasEntity) {
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> getRangerServiceResource(" + 
atlasEntity.getId()._getId() +")");
+                       LOG.debug("==> getRangerServiceResource(" + 
atlasEntity.getGuid() +")");
                }
 
                RangerServiceResource resource = null;
@@ -65,59 +63,63 @@ public class AtlasResourceMapperUtil {
                        try {
                                resource = mapper.buildResource(atlasEntity);
                        } catch (Exception exception) {
-                               LOG.error("Could not get serviceResource for 
atlas entity:" + atlasEntity.getId()._getId() + ": ", exception);
+                               LOG.error("Could not get serviceResource for 
atlas entity:" + atlasEntity.getGuid() + ": ", exception);
                        }
                }
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== getRangerServiceResource(" + 
atlasEntity.getId()._getId() +"): resource=" + resource);
+                       LOG.debug("<== getRangerServiceResource(" + 
atlasEntity.getGuid() +"): resource=" + resource);
                }
 
                return resource;
        }
 
        static public boolean initializeAtlasResourceMappers(Properties 
properties) {
+               final String MAPPER_NAME_DELIMITER = ",";
+
                String customMapperNames = 
TagSyncConfig.getCustomAtlasResourceMappers(properties);
 
                if (LOG.isDebugEnabled()) {
                        LOG.debug("==> 
initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + 
customMapperNames + ")");
                }
+               boolean ret = true;
 
-               // Initialize the default mappers
-               initializeAtlasResourceMapper(new AtlasHiveResourceMapper(), 
properties);
-               initializeAtlasResourceMapper(new AtlasHdfsResourceMapper(), 
properties);
-               initializeAtlasResourceMapper(new AtlasHbaseResourceMapper(), 
properties);
-               initializeAtlasResourceMapper(new AtlasKafkaResourceMapper(), 
properties);
-               initializeAtlasResourceMapper(new AtlasStormResourceMapper(), 
properties);
+               List<String> mapperNames = new ArrayList<String>();
+               
mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHiveResourceMapper");
+               
mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHdfsResourceMapper");
+               
mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasHbaseResourceMapper");
+               
mapperNames.add("org.apache.ranger.tagsync.source.atlas.AtlasKafkaResourceMapper");
 
-               // Initialize the custom mappers
-               boolean ret = true;
                if (StringUtils.isNotBlank(customMapperNames)) {
                        for (String customMapperName : 
customMapperNames.split(MAPPER_NAME_DELIMITER)) {
-                           try {
-                               Class<?> clazz = 
Class.forName(customMapperName);
-                               AtlasResourceMapper resourceMapper = 
(AtlasResourceMapper) clazz.newInstance();
-
-                               initializeAtlasResourceMapper(resourceMapper, 
properties);
-                           } catch (Exception exception) {
-                               LOG.error("Failed to create 
AtlasResourceMapper:" + customMapperName + ": ", exception);
-                               ret = false;
-                           }
+                               mapperNames.add(customMapperName.trim());
+                       }
+               }
+
+               for (String mapperName : mapperNames) {
+                       try {
+                               Class<?> clazz = Class.forName(mapperName);
+                               AtlasResourceMapper resourceMapper = 
(AtlasResourceMapper) clazz.newInstance();
+
+                               resourceMapper.initialize(properties);
+
+                               for (String entityTypeName : 
resourceMapper.getSupportedEntityTypes()) {
+                                       add(entityTypeName, resourceMapper);
+                               }
+
+                       } catch (Exception exception) {
+                               LOG.error("Failed to create 
AtlasResourceMapper:" + mapperName + ": ", exception);
+                               ret = false;
                        }
                }
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== 
initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + 
customMapperNames + "): " + ret);
+                       LOG.debug("<== 
initializeAtlasResourceMappers.initializeAtlasResourceMappers(" + mapperNames + 
"): " + ret);
                }
                return ret;
        }
 
-       private static void initializeAtlasResourceMapper(AtlasResourceMapper 
resourceMapper, Properties properties) {
-           resourceMapper.initialize(properties);
-
-        for (String entityTypeName : resourceMapper.getSupportedEntityTypes()) 
{
-            atlasResourceMappers.put(entityTypeName, resourceMapper);
-        }
+       private static void add(String entityType, AtlasResourceMapper mapper) {
+               atlasResourceMappers.put(entityType, mapper);
        }
-
 }

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java
index 4ed01ca..650968d 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasStormResourceMapper.java
@@ -22,17 +22,15 @@ package org.apache.ranger.tagsync.source.atlas;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.atlas.typesystem.IReferenceableInstance;
 import org.apache.commons.lang.StringUtils;
 import org.apache.ranger.plugin.model.RangerPolicy.RangerPolicyResource;
 import org.apache.ranger.plugin.model.RangerServiceResource;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity;
 
 public class AtlasStormResourceMapper extends AtlasResourceMapper {
        public static final String ENTITY_TYPE_STORM_TOPOLOGY = 
"storm_topology";
        public static final String RANGER_TYPE_STORM_TOPOLOGY = "topology";
 
-       public static final String ENTITY_ATTRIBUTE_QUALIFIED_NAME = 
"qualifiedName";
-
        public static final String[] SUPPORTED_ENTITY_TYPES = { 
ENTITY_TYPE_STORM_TOPOLOGY };
 
        public AtlasStormResourceMapper() {
@@ -40,8 +38,8 @@ public class AtlasStormResourceMapper extends 
AtlasResourceMapper {
        }
 
        @Override
-       public RangerServiceResource buildResource(final IReferenceableInstance 
entity) throws Exception {
-               String qualifiedName = getEntityAttribute(entity, 
ENTITY_ATTRIBUTE_QUALIFIED_NAME, String.class);
+    public RangerServiceResource buildResource(final RangerAtlasEntity entity) 
throws Exception {
+               String qualifiedName = 
(String)entity.getAttributes().get(AtlasResourceMapper.ENTITY_ATTRIBUTE_QUALIFIED_NAME);
 
                String topology = 
getResourceNameFromQualifiedName(qualifiedName);
 
@@ -65,7 +63,7 @@ public class AtlasStormResourceMapper extends 
AtlasResourceMapper {
 
                elements.put(RANGER_TYPE_STORM_TOPOLOGY, new 
RangerPolicyResource(topology, isExcludes, isRecursive));
 
-               String entityGuid  = entity.getId() != null ? 
entity.getId()._getId() : null;
+               String entityGuid  = entity.getGuid();
                String serviceName = getRangerServiceName(clusterName);
 
                return new RangerServiceResource(entityGuid, serviceName, 
elements);

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index c382db0..8c15ee5 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -20,24 +20,24 @@
 package org.apache.ranger.tagsync.source.atlas;
 
 
-import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.kafka.NotificationProvider;
+import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationInterface;
+import org.apache.atlas.v1.model.notification.EntityNotificationV1;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import org.apache.atlas.notification.NotificationConsumer;
-import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.entity.EntityNotification;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.ranger.tagsync.model.AbstractTagSource;
 import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.model.AbstractTagSource;
+import org.apache.atlas.kafka.AtlasKafkaMessage;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntityWithTags;
+
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Properties;
 import java.util.List;
+import java.util.Properties;
 
 public class AtlasTagSource extends AbstractTagSource {
        private static final Log LOG = LogFactory.getLog(AtlasTagSource.class);
@@ -100,10 +100,11 @@ public class AtlasTagSource extends AbstractTagSource {
                }
 
                if (ret) {
-            NotificationInterface notification = NotificationProvider.get();
-            List<NotificationConsumer<Object>> iterators = 
notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 
1);
+                       NotificationInterface notification = 
NotificationProvider.get();
+                       List<NotificationConsumer<EntityNotificationV1>> 
iterators = 
notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 
1);
+
+                       consumerTask = new ConsumerRunnable(iterators.get(0));
 
-            consumerTask = new ConsumerRunnable(iterators.get(0));
                }
 
                if (LOG.isDebugEnabled()) {
@@ -137,63 +138,59 @@ public class AtlasTagSource extends AbstractTagSource {
                }
        }
 
-       private static String getPrintableEntityNotification(EntityNotification 
notification) {
+       private static String 
getPrintableEntityNotification(EntityNotificationV1 notification) {
                StringBuilder sb = new StringBuilder();
 
                sb.append("{ Notification-Type: 
").append(notification.getOperationType()).append(", ");
-               AtlasEntityWithTraits entityWithTraits = new 
AtlasEntityWithTraits(notification.getEntity(), notification.getAllTraits());
-               sb.append(entityWithTraits.toString());
+        RangerAtlasEntityWithTags entityWithTags = new 
RangerAtlasEntityWithTags(notification);
+        sb.append(entityWithTags.toString());
+
                sb.append("}");
                return sb.toString();
        }
 
        private class ConsumerRunnable implements Runnable {
 
-               private final NotificationConsumer<Object> consumer;
+               private final NotificationConsumer<EntityNotificationV1> 
consumer;
 
-               private ConsumerRunnable(NotificationConsumer<Object> consumer) 
{
+               private 
ConsumerRunnable(NotificationConsumer<EntityNotificationV1> consumer) {
                        this.consumer = consumer;
                }
 
+
                @Override
                public void run() {
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("==> ConsumerRunnable.run()");
                        }
                        while (true) {
-                try {
-                    List<AtlasKafkaMessage<Object>> messages = 
consumer.receive(1000L);
-                    for (AtlasKafkaMessage<Object> message : messages) {
-                        Object kafkaMessage = message != null ? 
message.getMessage() : null;
-
-                        if (kafkaMessage != null) {
-                            EntityNotification notification = null;
-                            if (kafkaMessage instanceof EntityNotification) {
-                                notification = (EntityNotification) 
kafkaMessage;
-                            } else {
-                                LOG.warn("Received Kafka notification of 
unexpected type:[" + kafkaMessage.getClass().toString() + "], Ignoring...");
-                            }
-                            if (notification != null) {
-                                if (LOG.isDebugEnabled()) {
-                                    LOG.debug("Notification=" + 
getPrintableEntityNotification(notification));
-                                }
-
-                                ServiceTags serviceTags = 
AtlasNotificationMapper.processEntityNotification(notification);
-                                if (serviceTags != null) {
-                                    updateSink(serviceTags);
-                                }
-                            }
-                            TopicPartition partition = new 
TopicPartition("ATLAS_ENTITIES", message.getPartition());
-                            consumer.commit(partition, message.getOffset());
-                        } else {
-                            LOG.error("Null message received from Kafka!! 
Ignoring..");
-                        }
-                    }
-                } catch (Exception exception) {
-                    LOG.error("Caught exception..: ", exception);
-                    return;
-                }
-            }
+                               try {
+                                       
List<AtlasKafkaMessage<EntityNotificationV1>> messages = 
consumer.receive(1000L);
+
+                                       for 
(AtlasKafkaMessage<EntityNotificationV1> message :  messages) {
+                                               EntityNotificationV1 
notification = message != null ? message.getMessage() : null;
+
+                                               if (notification != null) {
+                                                       if 
(LOG.isDebugEnabled()) {
+                                                               
LOG.debug("Notification=" + getPrintableEntityNotification(notification));
+                                                       }
+
+                                                       ServiceTags serviceTags 
= AtlasNotificationMapper.processEntityNotification(notification);
+                                                       if (serviceTags != 
null) {
+                                                               
updateSink(serviceTags);
+                                                       }
+
+                                                       TopicPartition 
partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition());
+                                                       
consumer.commit(partition, message.getOffset());
+                                               } else {
+                                                       LOG.error("Null 
entityNotification received from Kafka!! Ignoring..");
+                                               }
+                                       }
+                               } catch (Exception exception) {
+                                       LOG.error("Caught exception..: ", 
exception);
+                                       return;
+                               }
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
index 4e0ae90..b715869 100644
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
+++ 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTTagSource.java
@@ -20,35 +20,67 @@
 package org.apache.ranger.tagsync.source.atlasrest;
 
 import com.google.gson.Gson;
-
 import com.google.gson.GsonBuilder;
-
+import org.apache.atlas.AtlasClientV2;
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.SearchFilter;
+import org.apache.atlas.model.discovery.AtlasSearchResult;
+import org.apache.atlas.model.discovery.SearchParameters;
+import org.apache.atlas.model.instance.AtlasClassification;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntityHeader;
+import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
+import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.type.AtlasBuiltInTypes;
+import org.apache.atlas.type.AtlasClassificationType;
+import org.apache.atlas.type.AtlasStructType;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.ranger.plugin.util.RangerRESTClient;
-import org.apache.ranger.tagsync.model.AbstractTagSource;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ranger.plugin.util.ServiceTags;
+import org.apache.ranger.tagsync.model.AbstractTagSource;
 import org.apache.ranger.tagsync.model.TagSink;
 import org.apache.ranger.tagsync.process.TagSyncConfig;
 import org.apache.ranger.tagsync.process.TagSynchronizer;
-import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits;
 import org.apache.ranger.tagsync.source.atlas.AtlasNotificationMapper;
 import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil;
 
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.TimeZone;
 
 public class AtlasRESTTagSource extends AbstractTagSource implements Runnable {
        private static final Log LOG = 
LogFactory.getLog(AtlasRESTTagSource.class);
 
-       private long sleepTimeBetweenCycleInMillis;
+    private static final ThreadLocal<DateFormat> DATE_FORMATTER = new 
ThreadLocal<DateFormat>() {
+               @Override
+               protected DateFormat initialValue() {
+                       SimpleDateFormat dateFormat = new 
SimpleDateFormat(AtlasBaseTypeDef.SERIALIZED_DATE_FORMAT_STR);
+
+                       dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
 
-       private AtlasRESTUtil atlasRESTUtil = null;
+                       return dateFormat;
+               }
+       };
+
+       private long sleepTimeBetweenCycleInMillis;
+       private String[] restUrls         = null;
+       private boolean  isKerberized     = false;
+       private String[] userNamePassword = null;
 
        private Thread myThread = null;
 
@@ -95,30 +127,26 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
                boolean ret = 
AtlasResourceMapperUtil.initializeAtlasResourceMappers(properties);
 
                sleepTimeBetweenCycleInMillis = 
TagSyncConfig.getTagSourceAtlasDownloadIntervalInMillis(properties);
-               final boolean isKerberized = 
TagSyncConfig.getTagsyncKerberosIdentity(properties) != null;
+               isKerberized = 
TagSyncConfig.getTagsyncKerberosIdentity(properties) != null;
 
-               String restUrl       = 
TagSyncConfig.getAtlasRESTEndpoint(properties);
+               String restEndpoint       = 
TagSyncConfig.getAtlasRESTEndpoint(properties);
                String sslConfigFile = 
TagSyncConfig.getAtlasRESTSslConfigFile(properties);
-               String userName = 
TagSyncConfig.getAtlasRESTUserName(properties);
-               String password = 
TagSyncConfig.getAtlasRESTPassword(properties);
+        this.userNamePassword = new String[] { 
TagSyncConfig.getAtlasRESTUserName(properties), 
TagSyncConfig.getAtlasRESTPassword(properties) };
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("restUrl=" + restUrl);
+                       LOG.debug("restUrl=" + restEndpoint);
                        LOG.debug("sslConfigFile=" + sslConfigFile);
-                       LOG.debug("userName=" + userName);
+                       LOG.debug("userName=" + userNamePassword[0]);
                        LOG.debug("kerberized=" + isKerberized);
                }
-
-               if (StringUtils.isNotEmpty(restUrl)) {
-                       if (!restUrl.endsWith("/")) {
-                               restUrl += "/";
-                       }
-                       RangerRESTClient atlasRESTClient = new 
RangerRESTClient(restUrl, sslConfigFile);
-
-                       if (!isKerberized) {
-                               atlasRESTClient.setBasicAuthInfo(userName, 
password);
-                       }
-                       atlasRESTUtil = new AtlasRESTUtil(atlasRESTClient, 
isKerberized);
+        if (StringUtils.isNotEmpty(restEndpoint)) {
+            this.restUrls = restEndpoint.split(",");
+
+            for (int i = 0; i < restUrls.length; i++) {
+                if (!restUrls[i].endsWith("/")) {
+                    restUrls[i] += "/";
+                }
+            }
                } else {
                        LOG.info("AtlasEndpoint not specified, Initial download 
of Atlas-entities cannot be done.");
                        ret = false;
@@ -174,16 +202,15 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
 
        public void synchUp() {
 
-               List<AtlasEntityWithTraits> atlasEntities = 
atlasRESTUtil.getAtlasEntities();
+               List<RangerAtlasEntityWithTags> rangerAtlasEntities = 
getAtlasActiveEntities();
 
-               if (CollectionUtils.isNotEmpty(atlasEntities)) {
+               if (CollectionUtils.isNotEmpty(rangerAtlasEntities)) {
                        if (LOG.isDebugEnabled()) {
-                               for (AtlasEntityWithTraits element : 
atlasEntities) {
+                               for (RangerAtlasEntityWithTags element : 
rangerAtlasEntities) {
                                        LOG.debug(element);
                                }
                        }
-
-                       Map<String, ServiceTags> serviceTagsMap = 
AtlasNotificationMapper.processAtlasEntities(atlasEntities);
+                       Map<String, ServiceTags> serviceTagsMap = 
AtlasNotificationMapper.processAtlasEntities(rangerAtlasEntities);
 
                        if (MapUtils.isNotEmpty(serviceTagsMap)) {
                                for (Map.Entry<String, ServiceTags> entry : 
serviceTagsMap.entrySet()) {
@@ -202,5 +229,158 @@ public class AtlasRESTTagSource extends AbstractTagSource 
implements Runnable {
 
        }
 
+       private List<RangerAtlasEntityWithTags> getAtlasActiveEntities() {
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("==> getAtlasActiveEntities()");
+               }
+               List<RangerAtlasEntityWithTags> ret = null;
+
+               SearchParameters searchParams = new SearchParameters();
+               AtlasTypeRegistry typeRegistry = new AtlasTypeRegistry();
+               AtlasTypeRegistry.AtlasTransientTypeRegistry tty = null;
+               AtlasSearchResult searchResult = null;
+
+               searchParams.setClassification("*");
+               searchParams.setIncludeClassificationAttributes(true);
+               searchParams.setOffset(0);
+               searchParams.setLimit(Integer.MAX_VALUE);
+
+               boolean commitUpdates = false;
+               try {
+                       AtlasClientV2 atlasClient = getAtlasClient();
+                       searchResult = atlasClient.facetedSearch(searchParams);
+                       AtlasTypesDef typesDef = atlasClient.getAllTypeDefs(new 
SearchFilter());
+                       tty = typeRegistry.lockTypeRegistryForUpdate();
+                       tty.addTypes(typesDef);
+                       commitUpdates = true;
+               } catch (AtlasServiceException | AtlasBaseException | 
IOException excp) {
+                       LOG.error("failed to download tags from Atlas", excp);
+               } catch (Exception unexpectedException) {
+                       LOG.error("Failed to download tags from Atlas due to 
unexpected exception", unexpectedException);
+               } finally {
+                       if (tty != null) {
+                               typeRegistry.releaseTypeRegistryForUpdate(tty, 
commitUpdates);
+                       }
+               }
+
+               if (commitUpdates && searchResult != null) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug(AtlasType.toJson(searchResult));
+                       }
+                       ret = new ArrayList<>();
+                       List<AtlasEntityHeader> entityHeaders = 
searchResult.getEntities();
+                       if (CollectionUtils.isNotEmpty(entityHeaders)) {
+                               for (AtlasEntityHeader header : entityHeaders) {
+                                       if 
(!header.getStatus().equals(AtlasEntity.Status.ACTIVE)) {
+                                               if (LOG.isDebugEnabled()) {
+                                                       LOG.debug("Skipping 
entity because it is not ACTIVE, header:[" + header + "]");
+                                               }
+                                               continue;
+                                       }
+
+                                       String typeName = header.getTypeName();
+                                       if 
(!AtlasResourceMapperUtil.isEntityTypeHandled(typeName)) {
+                                               if (LOG.isDebugEnabled()) {
+                                                       LOG.debug("Not fetching 
Atlas entities of type:[" + typeName + "]");
+                                               }
+                                               continue;
+                                       }
+
+                                       Map<String, Map<String, String>> 
allTagsForEntity = new HashMap<>();
+
+                                       for (AtlasClassification classification 
: header.getClassifications()) {
+                                               Map<String, Map<String, 
String>> tags = resolveTag(typeRegistry, classification.getTypeName(), 
classification.getAttributes());
+                                               if (tags != null) {
+                                                       
allTagsForEntity.putAll(tags);
+                                               }
+                                       }
+
+                                       if 
(MapUtils.isNotEmpty(allTagsForEntity)) {
+
+                                               RangerAtlasEntity entity = new 
RangerAtlasEntity(typeName, header.getGuid(), header.getAttributes());
+                                               RangerAtlasEntityWithTags 
entityWithTags = new RangerAtlasEntityWithTags(entity, allTagsForEntity, 
typeRegistry);
+                                               ret.add(entityWithTags);
+                                       }
+                               }
+                       }
+               }
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("<== getAtlasActiveEntities()");
+               }
+
+               return ret;
+       }
+
+    /*
+     * Returns a map of <tag-name, List<attributeName, [attributeValue, 
attributeType]>>
+     */
+       private Map<String, Map<String, String>> resolveTag(AtlasTypeRegistry 
typeRegistry, String typeName, Map<String, Object> attributes) {
+               Map<String, Map<String, String>> ret = new HashMap<>();
+
+               try {
+                       AtlasClassificationType classificationType = 
typeRegistry.getClassificationTypeByName(typeName);
+                       if (classificationType != null) {
+                               Map<String, String> allAttributes = new 
HashMap<>();
+                               if (MapUtils.isNotEmpty(attributes) && 
MapUtils.isNotEmpty(classificationType.getAllAttributes())) {
+                                       for (Map.Entry<String, Object> 
attribute : attributes.entrySet()) {
+                                               String name = 
attribute.getKey();
+                                               Object value = 
attribute.getValue();
+                                               if (value != null) {
+                                                       String stringValue = 
value.toString();
+                                                       
AtlasStructType.AtlasAttribute atlasAttribute = 
classificationType.getAttribute(name);
+                                                       if (atlasAttribute != 
null) {
+                                                               if (value 
instanceof Number) {
+                                                                       if 
(atlasAttribute.getAttributeType() instanceof AtlasBuiltInTypes.AtlasDateType) {
+                                                                               
stringValue = DATE_FORMATTER.get().format(value);
+                                                                       }
+                                                               }
+                                                               
allAttributes.put(name, stringValue);
+                                                       }
+                                               }
+                                       }
+                               }
+                               // Put most derived classificationType with all 
attributes
+                               ret.put(typeName, allAttributes);
+
+                               // Find base classification types
+                               Set<String> superTypeNames = 
classificationType.getAllSuperTypes();
+                               for (String superTypeName : superTypeNames) {
+                                       AtlasClassificationType superType = 
typeRegistry.getClassificationTypeByName(superTypeName);
+                                       if (superType != null) {
+                                               Map<String, String> 
attributeMap = new HashMap<>();
+                                               if 
(MapUtils.isNotEmpty(attributes) && 
MapUtils.isNotEmpty(superType.getAllAttributes())) {
+                                                       for (String name : 
superType.getAllAttributes().keySet()) {
+                                                               String 
stringValue = allAttributes.get(name);
+                                                               if (stringValue 
!= null) {
+                                                                       
attributeMap.put(name, stringValue);
+                                                               }
+                                                       }
+                                               }
+                                               ret.put(superTypeName, 
attributeMap);
+                                       }
+                               }
+                       }
+               } catch (Exception exception) {
+                       LOG.error("Error in resolving tags for type:[" + 
typeName + "]", exception);
+               }
+               return ret;
+       }
+
+       private AtlasClientV2 getAtlasClient() throws IOException {
+               final AtlasClientV2 ret;
+
+               if (isKerberized) {
+                       UserGroupInformation ugi = 
UserGroupInformation.getLoginUser();
+
+                       ugi.checkTGTAndReloginFromKeytab();
+
+                       ret = new AtlasClientV2(ugi, ugi.getShortUserName(), 
restUrls);
+               } else {
+                       ret = new AtlasClientV2(restUrls, userNamePassword);
+               }
+
+               return ret;
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/ranger/blob/e8afb9fa/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
----------------------------------------------------------------------
diff --git 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
 
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
deleted file mode 100644
index 00a101e..0000000
--- 
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/AtlasRESTUtil.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ranger.tagsync.source.atlasrest;
-
-import com.google.gson.Gson;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.Logger;
-import org.apache.ranger.admin.client.datatype.RESTResponse;
-import org.apache.ranger.plugin.util.RangerRESTClient;
-import org.apache.ranger.tagsync.source.atlas.AtlasEntityWithTraits;
-import org.apache.ranger.tagsync.source.atlas.AtlasResourceMapperUtil;
-
-import java.io.IOException;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-@SuppressWarnings("unchecked")
-public class AtlasRESTUtil {
-       private static final Logger LOG = Logger.getLogger(AtlasRESTUtil.class);
-
-       private static final String REST_MIME_TYPE_JSON = "application/json";
-       private static final String API_ATLAS_TYPES    = "api/atlas/types";
-       private static final String API_ATLAS_ENTITIES = 
"api/atlas/entities?type=";
-       private static final String API_ATLAS_ENTITY   = "api/atlas/entities/";
-       private static final String API_ATLAS_TYPE     = "api/atlas/types/";
-
-       private static final String RESULTS_ATTRIBUTE               = "results";
-       private static final String DEFINITION_ATTRIBUTE            = 
"definition";
-       private static final String VALUES_ATTRIBUTE                = "values";
-       private static final String TRAITS_ATTRIBUTE                = "traits";
-       private static final String TYPE_NAME_ATTRIBUTE             = 
"typeName";
-       private static final String TRAIT_TYPES_ATTRIBUTE           = 
"traitTypes";
-       private static final String SUPER_TYPES_ATTRIBUTE           = 
"superTypes";
-       private static final String ATTRIBUTE_DEFINITIONS_ATTRIBUTE = 
"attributeDefinitions";
-       private static final String NAME_ATTRIBUTE                  = "name";
-
-       private final Gson gson = new Gson();
-
-       private final RangerRESTClient atlasRESTClient;
-
-       private final boolean isKerberized;
-
-       public AtlasRESTUtil(RangerRESTClient atlasRESTClient, boolean 
isKerberized) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> AtlasRESTUtil()");
-               }
-
-               this.atlasRESTClient = atlasRESTClient;
-
-               this.isKerberized = isKerberized;
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== AtlasRESTUtil()");
-               }
-       }
-
-       public List<AtlasEntityWithTraits> getAtlasEntities() {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> getAtlasEntities()");
-               }
-
-               List<AtlasEntityWithTraits> ret = new 
ArrayList<AtlasEntityWithTraits>();
-
-               Map<String, Object> typesResponse = atlasAPI(API_ATLAS_TYPES);
-
-               List<String> types = getAttribute(typesResponse, 
RESULTS_ATTRIBUTE, List.class);
-
-               if (CollectionUtils.isNotEmpty(types)) {
-
-                       for (String type : types) {
-
-                               if 
(!AtlasResourceMapperUtil.isEntityTypeHandled(type)) {
-                                       if (LOG.isDebugEnabled()) {
-                                               LOG.debug("Not fetching Atlas 
entities of type: " + type);
-                                       }
-                                       continue;
-                               }
-
-                               Map<String, Object> entitiesResponse = 
atlasAPI(API_ATLAS_ENTITIES + type);
-
-                               List<String> guids = 
getAttribute(entitiesResponse, RESULTS_ATTRIBUTE, List.class);
-
-                               if (CollectionUtils.isEmpty(guids)) {
-                                       if (LOG.isDebugEnabled()) {
-                                               LOG.debug("No Atlas entities 
for type: " + type);
-                                       }
-                                       continue;
-                               }
-
-                               for (String guid : guids) {
-
-                                       Map<String, Object> entityResponse = 
atlasAPI(API_ATLAS_ENTITY + guid);
-
-                                       Map<String, Object> definition = 
getAttribute(entityResponse, DEFINITION_ATTRIBUTE, Map.class);
-
-                                       Map<String, Object> traitsAttribute = 
getAttribute(definition, TRAITS_ATTRIBUTE, Map.class);
-
-                                       List<IStruct> allTraits = new 
LinkedList<>();
-
-                                       if 
(MapUtils.isNotEmpty(traitsAttribute)) {
-
-                                               for (Map.Entry<String, Object> 
entry : traitsAttribute.entrySet()) {
-
-                                                       Map<String, Object> 
trait = (Map<String, Object>) entry.getValue();
-
-                                                       Map<String, Object> 
traitValues = getAttribute(trait, VALUES_ATTRIBUTE, Map.class);
-                                                       String traitTypeName = 
getAttribute(trait, TYPE_NAME_ATTRIBUTE, String.class);
-
-                                                       if 
(StringUtils.isEmpty(traitTypeName)) {
-                                                               continue;
-                                                       }
-
-                                                       List<IStruct> 
superTypes = getTraitSuperTypes(getTraitType(traitTypeName), traitValues);
-
-                                                       Struct trait1 = new 
Struct(traitTypeName, traitValues);
-
-                                                       allTraits.add(trait1);
-                                                       
allTraits.addAll(superTypes);
-                                               }
-                                       }
-
-                                       IReferenceableInstance entity = 
InstanceSerialization.fromJsonReferenceable(gson.toJson(definition), true);
-
-                                       if (entity != null) {
-                                               AtlasEntityWithTraits 
atlasEntity = new AtlasEntityWithTraits(entity, allTraits);
-                                               ret.add(atlasEntity);
-                                       } else {
-                                               if (LOG.isInfoEnabled()) {
-                                                       LOG.info("Could not 
create Atlas entity from its definition, type=" + type + ", guid=" + guid);
-                                               }
-                                       }
-
-                               }
-
-                       }
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("<== getAtlasEntities()");
-                       }
-               }
-
-               return ret;
-       }
-
-       private Map<String, Object> getTraitType(String traitName) {
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> getTraitType(" + traitName + ")");
-               }
-               Map<String, Object> ret = null;
-
-               Map<String, Object> typeResponse = atlasAPI(API_ATLAS_TYPE + 
traitName);
-
-               Map<String, Object> definition = getAttribute(typeResponse, 
DEFINITION_ATTRIBUTE, Map.class);
-
-               List traitTypes = getAttribute(definition, 
TRAIT_TYPES_ATTRIBUTE, List.class);
-
-               if (CollectionUtils.isNotEmpty(traitTypes)) {
-                       ret = (Map<String, Object>) traitTypes.get(0);
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== getTraitType(" + traitName + ")");
-               }
-               return ret;
-       }
-
-       private List<IStruct> getTraitSuperTypes(Map<String, Object> traitType, 
Map<String, Object> values) {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> getTraitSuperTypes()");
-               }
-               List<IStruct> ret = new LinkedList<>();
-
-               if (traitType != null) {
-
-                       List<String> superTypeNames = getAttribute(traitType, 
SUPER_TYPES_ATTRIBUTE, List.class);
-
-                       if (CollectionUtils.isNotEmpty(superTypeNames)) {
-                               for (String superTypeName : superTypeNames) {
-
-                                       Map<String, Object> superTraitType = 
getTraitType(superTypeName);
-
-                                       if (superTraitType != null) {
-                                               List<Map<String, Object>> 
attributeDefinitions = (List) 
superTraitType.get(ATTRIBUTE_DEFINITIONS_ATTRIBUTE);
-
-                                               Map<String, Object> 
superTypeValues = new HashMap<>();
-                                               for (Map<String, Object> 
attributeDefinition : attributeDefinitions) {
-
-                                                       String attributeName = 
attributeDefinition.get(NAME_ATTRIBUTE).toString();
-                                                       if 
(values.containsKey(attributeName)) {
-                                                               
superTypeValues.put(attributeName, values.get(attributeName));
-                                                       }
-                                               }
-
-                                               List<IStruct> superTraits = 
getTraitSuperTypes(getTraitType(superTypeName), values);
-
-                                               Struct superTrait = new 
Struct(superTypeName, superTypeValues);
-
-                                               ret.add(superTrait);
-                                               ret.addAll(superTraits);
-                                       }
-                               }
-                       }
-               }
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== getTraitSuperTypes()");
-               }
-               return ret;
-       }
-
-       private Map<String, Object> atlasAPI(final String endpoint) {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> atlasAPI(" + endpoint + ")");
-               }
-               Map<String, Object> ret = new HashMap<String, Object>();
-
-               try {
-                       UserGroupInformation userGroupInformation = null;
-                       if (isKerberized) {
-                               userGroupInformation = 
UserGroupInformation.getLoginUser();
-
-                               try {
-                                       
userGroupInformation.checkTGTAndReloginFromKeytab();
-                               } catch (IOException ioe) {
-                                       LOG.error("Error renewing TGT and 
relogin", ioe);
-                                       userGroupInformation = null;
-                               }
-                       }
-                       if (userGroupInformation != null) {
-                               LOG.debug("Using kerberos authentication");
-                               if(LOG.isDebugEnabled()) {
-                                       LOG.debug("Using Principal = "+ 
userGroupInformation.getUserName());
-                               }
-                               ret = userGroupInformation.doAs(new 
PrivilegedAction<Map<String, Object>>() {
-                                       @Override
-                                       public Map<String, Object> run() {
-                                               try{
-                                                       return 
executeAtlasAPI(endpoint);
-                                               }catch (Exception e) {
-                                                       LOG.error("Atlas API 
failed with message : ", e);
-                                               }
-                                               return null;
-                                       }
-                               });
-                       } else {
-                               LOG.debug("Using basic authentication");
-                               ret = executeAtlasAPI(endpoint);
-                       }
-               } catch (Exception exception) {
-                       LOG.error("Exception when fetching Atlas objects.", 
exception);
-                       ret = null;
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== atlasAPI(" + endpoint + ")");
-               }
-               return ret;
-       }
-
-       private Map<String, Object> executeAtlasAPI(final String endpoint) {
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("==> executeAtlasAPI(" + endpoint + ")");
-               }
-
-               Map<String, Object> ret = new HashMap<String, Object>();
-
-               try {
-                       final WebResource webResource = 
atlasRESTClient.getResource(endpoint);
-
-                       ClientResponse response = 
webResource.accept(REST_MIME_TYPE_JSON).type(REST_MIME_TYPE_JSON).get(ClientResponse.class);
-
-                       if (response != null && response.getStatus() == 200) {
-                               ret = response.getEntity(ret.getClass());
-                       } else {
-                               RESTResponse resp = 
RESTResponse.fromClientResponse(response);
-                               LOG.error("Error getting atlas data request=" + 
webResource.toString()
-                                               + ", response=" + 
resp.toString());
-                       }
-               } catch (Exception exception) {
-                       LOG.error("Exception when fetching Atlas objects.", 
exception);
-                       ret = null;
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("<== executeAtlasAPI(" + endpoint + ")");
-               }
-
-               return ret;
-       }
-
-       private <T> T getAttribute(Map<String, Object> map, String name, 
Class<T> type) {
-               return MapUtils.isNotEmpty(map) ? type.cast(map.get(name)) : 
null;
-       }
-
-}

Reply via email to