ATLAS-2870: Improvement to AddClassification transform to use filters.

Signed-off-by: Ashutosh Mestry <ames...@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/e4414155
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/e4414155
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/e4414155

Branch: refs/heads/branch-1.0
Commit: e4414155070602650db466480e59b3aa5b02ea7f
Parents: 83156a2
Author: Ashutosh Mestry <ames...@hortonworks.com>
Authored: Tue Sep 11 17:06:43 2018 -0700
Committer: Ashutosh Mestry <ames...@hortonworks.com>
Committed: Thu Nov 1 15:42:55 2018 -0700

----------------------------------------------------------------------
 .../atlas/repository/impexp/ImportService.java  |  2 +-
 .../repository/impexp/ImportTransformer.java    | 58 +++++++++++++++++++-
 .../impexp/ImportTransformsShaper.java          | 16 +++++-
 .../repository/impexp/ImportTransformsTest.java | 29 +++++++++-
 4 files changed, 97 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/e4414155/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
index 095f60f..a88ba2b 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java
@@ -112,7 +112,7 @@ public class ImportService {
             return;
         }
 
-        importTransformsShaper.shape(importTransform);
+        importTransformsShaper.shape(importTransform, 
source.getExportResult().getRequest());
 
         source.setImportTransform(importTransform);
         if(LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/e4414155/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
index 70117f6..7bc3536 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java
@@ -21,10 +21,14 @@ import org.apache.atlas.AtlasErrorCode;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.commons.lang.StringUtils;
+import scala.Tuple3;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 
 
 public abstract class ImportTransformer {
@@ -71,8 +75,8 @@ public abstract class ImportTransformer {
         } else if (key.equals(TRANSFORMER_SET_DELETED)) {
             ret = new SetDeleted();
         } else if (key.equals(TRANSFORMER_NAME_ADD_CLASSIFICATION)) {
-            String name = (params == null || params.length < 1) ? "" : 
StringUtils.join(params, ":", 1, params.length);
-            ret = new AddClassification(name);
+            String name = (params == null || params.length < 1) ? "" : 
params[1];
+            ret = new AddClassification(name, (params != null && params.length 
== 3) ? params[2] : "");
         } else {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error 
creating ImportTransformer. Unknown transformer: {}.", transformerSpec);
         }
@@ -151,12 +155,22 @@ public abstract class ImportTransformer {
     }
 
     static class AddClassification extends ImportTransformer {
+        private static final String FILTER_SCOPE_TOP_LEVEL = "topLevel";
+
+        private final String scope;
         private final String classificationName;
+        private List<AtlasObjectId> filters;
 
-        public AddClassification(String name) {
+        public AddClassification(String name, String scope) {
             super(TRANSFORMER_NAME_REMOVE_CLASSIFICATION);
 
             this.classificationName = name;
+            this.scope = scope;
+            filters = new ArrayList<>();
+        }
+
+        public void addFilter(AtlasObjectId objectId) {
+            filters.add(objectId);
         }
 
         @Override
@@ -166,6 +180,10 @@ public abstract class ImportTransformer {
             }
 
             AtlasEntity entity = (AtlasEntity) o;
+            if(!passThruFilters(entity)) {
+                return o;
+            }
+
             if(entity.getClassifications() == null) {
                 entity.setClassifications(new 
ArrayList<AtlasClassification>());
             }
@@ -180,6 +198,40 @@ public abstract class ImportTransformer {
             return entity;
         }
 
+        private boolean passThruFilters(AtlasEntity entity) {
+            if(StringUtils.isEmpty(scope) || 
!scope.equals(FILTER_SCOPE_TOP_LEVEL)) {
+                return true;
+            }
+
+            for (AtlasObjectId filter : filters) {
+                if(isMatch(filter, entity)) {
+                    return true;
+                }
+            }
+
+            return false;
+        }
+
+        private boolean isMatch(AtlasObjectId objectId, AtlasEntity entity) {
+            boolean ret = true;
+            if (StringUtils.isEmpty(objectId.getGuid())) {
+                ret = Objects.equals(objectId.getTypeName(), 
entity.getTypeName());
+                if (ret) {
+                    for (Map.Entry<String, Object> entry : 
objectId.getUniqueAttributes().entrySet()) {
+                        ret = ret && 
Objects.equals(entity.getAttribute(entry.getKey()), entry.getValue());
+                        if (!ret) {
+                            break;
+                        }
+                    }
+                }
+
+                return ret;
+
+            } else {
+                return Objects.equals(objectId.getGuid(), entity.getGuid());
+            }
+        }
+
         @Override
         public String toString() {
             return String.format("%s=%s", "AddClassification", 
classificationName);

http://git-wip-us.apache.org/repos/asf/atlas/blob/e4414155/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java
 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java
index 62eba45..ce0c8f1 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java
@@ -19,6 +19,8 @@
 package org.apache.atlas.repository.impexp;
 
 import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.typedef.AtlasClassificationDef;
 import org.apache.atlas.model.typedef.AtlasTypesDef;
 import org.apache.atlas.store.AtlasTypeDefStore;
@@ -32,6 +34,7 @@ import javax.inject.Inject;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 @Component
 public class ImportTransformsShaper {
@@ -46,12 +49,12 @@ public class ImportTransformsShaper {
         this.typeDefStore = typeDefStore;
     }
 
-    public void shape(ImportTransforms importTransform) throws 
AtlasBaseException {
-        getCreateClassifications(importTransform);
+    public void shape(ImportTransforms importTransform, AtlasExportRequest 
request) throws AtlasBaseException {
+        getCreateClassifications(importTransform, request);
         updateTransformsWithSubTypes(importTransform);
     }
 
-    private void getCreateClassifications(ImportTransforms importTransform) 
throws AtlasBaseException {
+    private void getCreateClassifications(ImportTransforms importTransform, 
AtlasExportRequest request) throws AtlasBaseException {
         Map<String, Map<String, List<ImportTransformer>>> mapMapList = 
importTransform.getTransforms();
         for (Map<String, List<ImportTransformer>> mapList : 
mapMapList.values()) {
             for (List<ImportTransformer> list : mapList.values()) {
@@ -59,6 +62,7 @@ public class ImportTransformsShaper {
                     if((importTransformer instanceof 
ImportTransformer.AddClassification)) {
 
                         ImportTransformer.AddClassification addClassification 
= (ImportTransformer.AddClassification) importTransformer;
+                        addFilters(request, addClassification);
                         
getCreateTag(addClassification.getClassificationName());
                     }
                 }
@@ -66,6 +70,12 @@ public class ImportTransformsShaper {
         }
     }
 
+    private void addFilters(AtlasExportRequest request, 
ImportTransformer.AddClassification transformer) {
+        for(AtlasObjectId objectId : request.getItemsToExport()) {
+            transformer.addFilter(objectId);
+        }
+    }
+
     private void updateTransformsWithSubTypes(ImportTransforms 
importTransforms) {
         String[] transformTypes = importTransforms.getTypes().toArray(new 
String[importTransforms.getTypes().size()]);
         for (int i = 0; i < transformTypes.length; i++) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/e4414155/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
----------------------------------------------------------------------
diff --git 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
index cd623d0..1959576 100644
--- 
a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
+++ 
b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java
@@ -21,6 +21,7 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasClassification;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
@@ -37,6 +38,8 @@ import static org.testng.Assert.assertTrue;
 
 public class ImportTransformsTest {
     private final String ATTR_NAME_QUALIFIED_NAME =  "qualifiedName";
+    private final String COLUMN_QUALIFIED_NAME_FORMAT = 
"col%s.TABLE1.default@cl1";
+
     private final String lowerCaseCL1   = "@cl1";
     private final String lowerCaseCL2   = "@cl2";
     private final String jsonLowerCaseReplace = "{ \"hive_table\": { 
\"qualifiedName\":[ \"lowercase\", \"replace:@cl1:@cl2\" ] } }";
@@ -48,6 +51,7 @@ public class ImportTransformsTest {
     private final String jsonSetDeleted = "{ \"hive_table\": { \"*\":[ 
\"setDeleted\" ] } }";
     private final String jsonAddClasification = "{ \"hive_table\": { \"*\":[ 
\"addClassification:REPLICATED\" ] } }";
     private final String jsonAddClasification2 = "{ \"hive_table\": { \"*\":[ 
\"addClassification:REPLICATED_2\" ] } }";
+    private final String jsonAddClasificationScoped = "{ \"hive_column\": { 
\"*\":[ \"addClassification:REPLICATED_2:topLevel\" ] } }";
 
     private ImportTransforms transform;
     private String HIVE_TABLE_ATTR_SYNC_INFO = "hive_table.syncInfo";
@@ -210,6 +214,29 @@ public class ImportTransformsTest {
         addClassification_MultipleClassificationsAreAdded(entity);
     }
 
+    @Test
+    public void addScopedClassification() throws AtlasBaseException {
+        AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = 
getAtlasEntityWithExtInfo();
+        AtlasEntity entity = entityWithExtInfo.getReferredEntities().get("2");
+
+        int existingClassificationsCount =  
entityWithExtInfo.getEntity().getClassifications() != null ? 
entity.getClassifications().size() : 0;
+        ImportTransforms t = 
ImportTransforms.fromJson(jsonAddClasificationScoped);
+
+        assertTrue(t.getTransforms().size() > 0);
+
+        ImportTransformer.AddClassification classification = 
(ImportTransformer.AddClassification) 
t.getTransforms().get("hive_column").get("*").get(0);
+        AtlasObjectId objectId = new AtlasObjectId("hive_column", 
ATTR_NAME_QUALIFIED_NAME, String.format(COLUMN_QUALIFIED_NAME_FORMAT, 2));
+        classification.addFilter(objectId);
+        t.apply(entityWithExtInfo);
+
+        assertNotNull(t);
+
+        assertNull(entityWithExtInfo.getEntity().getClassifications());
+        
assertNull(entityWithExtInfo.getReferredEntities().get("0").getClassifications());
+        
assertEquals(entityWithExtInfo.getReferredEntities().get("1").getClassifications().size(),
 existingClassificationsCount + 1);
+        
assertNull(entityWithExtInfo.getReferredEntities().get("2").getClassifications());
+    }
+
     private void 
addClassification_ExistingClassificationsAreHandled(AtlasEntity entity) throws 
AtlasBaseException {
         int existingClassificationsCount =  entity.getClassifications() != 
null ? entity.getClassifications().size() : 0;
         assertTrue(existingClassificationsCount > 0);
@@ -270,7 +297,7 @@ public class ImportTransformsTest {
         AtlasEntity entity = new AtlasEntity("hive_column");
 
         Map<String, Object> attributes = new HashMap<>();
-        attributes.put(ATTR_NAME_QUALIFIED_NAME, 
String.format("col%s.TABLE1.default@cl1", index));
+        attributes.put(ATTR_NAME_QUALIFIED_NAME, 
String.format(COLUMN_QUALIFIED_NAME_FORMAT, index));
         attributes.put("name", "col" + index);
 
         entity.setAttributes(attributes);

Reply via email to