ATLAS-2870: Improvement to AddClassification transform to use filters. Signed-off-by: Ashutosh Mestry <ames...@hortonworks.com>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/e4414155 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/e4414155 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/e4414155 Branch: refs/heads/branch-1.0 Commit: e4414155070602650db466480e59b3aa5b02ea7f Parents: 83156a2 Author: Ashutosh Mestry <ames...@hortonworks.com> Authored: Tue Sep 11 17:06:43 2018 -0700 Committer: Ashutosh Mestry <ames...@hortonworks.com> Committed: Thu Nov 1 15:42:55 2018 -0700 ---------------------------------------------------------------------- .../atlas/repository/impexp/ImportService.java | 2 +- .../repository/impexp/ImportTransformer.java | 58 +++++++++++++++++++- .../impexp/ImportTransformsShaper.java | 16 +++++- .../repository/impexp/ImportTransformsTest.java | 29 +++++++++- 4 files changed, 97 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/e4414155/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index 095f60f..a88ba2b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -112,7 +112,7 @@ public class ImportService { return; } - importTransformsShaper.shape(importTransform); + importTransformsShaper.shape(importTransform, source.getExportResult().getRequest()); source.setImportTransform(importTransform); if(LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/atlas/blob/e4414155/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java index 70117f6..7bc3536 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java @@ -21,10 +21,14 @@ import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.commons.lang.StringUtils; +import scala.Tuple3; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Objects; public abstract class ImportTransformer { @@ -71,8 +75,8 @@ public abstract class ImportTransformer { } else if (key.equals(TRANSFORMER_SET_DELETED)) { ret = new SetDeleted(); } else if (key.equals(TRANSFORMER_NAME_ADD_CLASSIFICATION)) { - String name = (params == null || params.length < 1) ? "" : StringUtils.join(params, ":", 1, params.length); - ret = new AddClassification(name); + String name = (params == null || params.length < 1) ? "" : params[1]; + ret = new AddClassification(name, (params != null && params.length == 3) ? params[2] : ""); } else { throw new AtlasBaseException(AtlasErrorCode.INVALID_VALUE, "Error creating ImportTransformer. Unknown transformer: {}.", transformerSpec); } @@ -151,12 +155,22 @@ public abstract class ImportTransformer { } static class AddClassification extends ImportTransformer { + private static final String FILTER_SCOPE_TOP_LEVEL = "topLevel"; + + private final String scope; private final String classificationName; + private List<AtlasObjectId> filters; - public AddClassification(String name) { + public AddClassification(String name, String scope) { super(TRANSFORMER_NAME_REMOVE_CLASSIFICATION); this.classificationName = name; + this.scope = scope; + filters = new ArrayList<>(); + } + + public void addFilter(AtlasObjectId objectId) { + filters.add(objectId); } @Override @@ -166,6 +180,10 @@ public abstract class ImportTransformer { } AtlasEntity entity = (AtlasEntity) o; + if(!passThruFilters(entity)) { + return o; + } + if(entity.getClassifications() == null) { entity.setClassifications(new ArrayList<AtlasClassification>()); } @@ -180,6 +198,40 @@ public abstract class ImportTransformer { return entity; } + private boolean passThruFilters(AtlasEntity entity) { + if(StringUtils.isEmpty(scope) || !scope.equals(FILTER_SCOPE_TOP_LEVEL)) { + return true; + } + + for (AtlasObjectId filter : filters) { + if(isMatch(filter, entity)) { + return true; + } + } + + return false; + } + + private boolean isMatch(AtlasObjectId objectId, AtlasEntity entity) { + boolean ret = true; + if (StringUtils.isEmpty(objectId.getGuid())) { + ret = Objects.equals(objectId.getTypeName(), entity.getTypeName()); + if (ret) { + for (Map.Entry<String, Object> entry : objectId.getUniqueAttributes().entrySet()) { + ret = ret && Objects.equals(entity.getAttribute(entry.getKey()), entry.getValue()); + if (!ret) { + break; + } + } + } + + return ret; + + } else { + return Objects.equals(objectId.getGuid(), entity.getGuid()); + } + } + @Override public String toString() { return String.format("%s=%s", "AddClassification", classificationName); http://git-wip-us.apache.org/repos/asf/atlas/blob/e4414155/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java index 62eba45..ce0c8f1 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformsShaper.java @@ -19,6 +19,8 @@ package org.apache.atlas.repository.impexp; import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.typedef.AtlasClassificationDef; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.store.AtlasTypeDefStore; @@ -32,6 +34,7 @@ import javax.inject.Inject; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; @Component public class ImportTransformsShaper { @@ -46,12 +49,12 @@ public class ImportTransformsShaper { this.typeDefStore = typeDefStore; } - public void shape(ImportTransforms importTransform) throws AtlasBaseException { - getCreateClassifications(importTransform); + public void shape(ImportTransforms importTransform, AtlasExportRequest request) throws AtlasBaseException { + getCreateClassifications(importTransform, request); updateTransformsWithSubTypes(importTransform); } - private void getCreateClassifications(ImportTransforms importTransform) throws AtlasBaseException { + private void getCreateClassifications(ImportTransforms importTransform, AtlasExportRequest request) throws AtlasBaseException { Map<String, Map<String, List<ImportTransformer>>> mapMapList = importTransform.getTransforms(); for (Map<String, List<ImportTransformer>> mapList : mapMapList.values()) { for (List<ImportTransformer> list : mapList.values()) { @@ -59,6 +62,7 @@ public class ImportTransformsShaper { if((importTransformer instanceof ImportTransformer.AddClassification)) { ImportTransformer.AddClassification addClassification = (ImportTransformer.AddClassification) importTransformer; + addFilters(request, addClassification); getCreateTag(addClassification.getClassificationName()); } } @@ -66,6 +70,12 @@ public class ImportTransformsShaper { } } + private void addFilters(AtlasExportRequest request, ImportTransformer.AddClassification transformer) { + for(AtlasObjectId objectId : request.getItemsToExport()) { + transformer.addFilter(objectId); + } + } + private void updateTransformsWithSubTypes(ImportTransforms importTransforms) { String[] transformTypes = importTransforms.getTypes().toArray(new String[importTransforms.getTypes().size()]); for (int i = 0; i < transformTypes.length; i++) { http://git-wip-us.apache.org/repos/asf/atlas/blob/e4414155/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java index cd623d0..1959576 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ImportTransformsTest.java @@ -21,6 +21,7 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.instance.AtlasObjectId; import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; @@ -37,6 +38,8 @@ import static org.testng.Assert.assertTrue; public class ImportTransformsTest { private final String ATTR_NAME_QUALIFIED_NAME = "qualifiedName"; + private final String COLUMN_QUALIFIED_NAME_FORMAT = "col%s.TABLE1.default@cl1"; + private final String lowerCaseCL1 = "@cl1"; private final String lowerCaseCL2 = "@cl2"; private final String jsonLowerCaseReplace = "{ \"hive_table\": { \"qualifiedName\":[ \"lowercase\", \"replace:@cl1:@cl2\" ] } }"; @@ -48,6 +51,7 @@ public class ImportTransformsTest { private final String jsonSetDeleted = "{ \"hive_table\": { \"*\":[ \"setDeleted\" ] } }"; private final String jsonAddClasification = "{ \"hive_table\": { \"*\":[ \"addClassification:REPLICATED\" ] } }"; private final String jsonAddClasification2 = "{ \"hive_table\": { \"*\":[ \"addClassification:REPLICATED_2\" ] } }"; + private final String jsonAddClasificationScoped = "{ \"hive_column\": { \"*\":[ \"addClassification:REPLICATED_2:topLevel\" ] } }"; private ImportTransforms transform; private String HIVE_TABLE_ATTR_SYNC_INFO = "hive_table.syncInfo"; @@ -210,6 +214,29 @@ public class ImportTransformsTest { addClassification_MultipleClassificationsAreAdded(entity); } + @Test + public void addScopedClassification() throws AtlasBaseException { + AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = getAtlasEntityWithExtInfo(); + AtlasEntity entity = entityWithExtInfo.getReferredEntities().get("2"); + + int existingClassificationsCount = entityWithExtInfo.getEntity().getClassifications() != null ? entity.getClassifications().size() : 0; + ImportTransforms t = ImportTransforms.fromJson(jsonAddClasificationScoped); + + assertTrue(t.getTransforms().size() > 0); + + ImportTransformer.AddClassification classification = (ImportTransformer.AddClassification) t.getTransforms().get("hive_column").get("*").get(0); + AtlasObjectId objectId = new AtlasObjectId("hive_column", ATTR_NAME_QUALIFIED_NAME, String.format(COLUMN_QUALIFIED_NAME_FORMAT, 2)); + classification.addFilter(objectId); + t.apply(entityWithExtInfo); + + assertNotNull(t); + + assertNull(entityWithExtInfo.getEntity().getClassifications()); + assertNull(entityWithExtInfo.getReferredEntities().get("0").getClassifications()); + assertEquals(entityWithExtInfo.getReferredEntities().get("1").getClassifications().size(), existingClassificationsCount + 1); + assertNull(entityWithExtInfo.getReferredEntities().get("2").getClassifications()); + } + private void addClassification_ExistingClassificationsAreHandled(AtlasEntity entity) throws AtlasBaseException { int existingClassificationsCount = entity.getClassifications() != null ? entity.getClassifications().size() : 0; assertTrue(existingClassificationsCount > 0); @@ -270,7 +297,7 @@ public class ImportTransformsTest { AtlasEntity entity = new AtlasEntity("hive_column"); Map<String, Object> attributes = new HashMap<>(); - attributes.put(ATTR_NAME_QUALIFIED_NAME, String.format("col%s.TABLE1.default@cl1", index)); + attributes.put(ATTR_NAME_QUALIFIED_NAME, String.format(COLUMN_QUALIFIED_NAME_FORMAT, index)); attributes.put("name", "col" + index); entity.setAttributes(attributes);