ATLAS-2882: AddClassification transform for new transforms
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/2111b95e Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/2111b95e Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/2111b95e Branch: refs/heads/branch-1.0 Commit: 2111b95e3bd492483fa5cd1df295819c15e258e8 Parents: 6c4d399 Author: Ashutosh Mestry <ames...@hortonworks.com> Authored: Thu Sep 20 12:54:36 2018 -0700 Committer: Ashutosh Mestry <ames...@hortonworks.com> Committed: Thu Nov 1 15:42:55 2018 -0700 ---------------------------------------------------------------------- .../apache/atlas/entitytransform/Action.java | 68 +++++++++++ .../entitytransform/AtlasEntityTransformer.java | 11 +- .../entitytransform/BaseEntityHandler.java | 101 ++++++++++++++-- .../apache/atlas/entitytransform/Condition.java | 86 +++++++++++++ .../atlas/entitytransform/NeedsContext.java | 23 ++++ .../entitytransform/TransformerContext.java | 47 ++++++++ .../TransformationHandlerTest.java | 120 +++++++++++++------ .../atlas/repository/impexp/ImportService.java | 36 ++---- .../atlas/repository/impexp/ZipSource.java | 6 +- 9 files changed, 424 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/2111b95e/intg/src/main/java/org/apache/atlas/entitytransform/Action.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/Action.java b/intg/src/main/java/org/apache/atlas/entitytransform/Action.java index f01c6ce..fa18558 100644 --- a/intg/src/main/java/org/apache/atlas/entitytransform/Action.java +++ b/intg/src/main/java/org/apache/atlas/entitytransform/Action.java @@ -17,16 +17,26 @@ */ package org.apache.atlas.entitytransform; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.typedef.AtlasClassificationDef; +import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.commons.lang.StringUtils; import org.apache.atlas.entitytransform.BaseEntityHandler.AtlasTransformableEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; + public abstract class Action { private static final Logger LOG = LoggerFactory.getLogger(Action.class); + private static final String ENTITY_KEY = "__entity"; private static final String ACTION_DELIMITER = ":"; + private static final String ACTION_ADD_CLASSIFICATION = "ADDCLASSIFICATION"; private static final String ACTION_NAME_SET = "SET"; private static final String ACTION_NAME_REPLACE_PREFIX = "REPLACE_PREFIX"; private static final String ACTION_NAME_TO_LOWER = "TO_LOWER"; @@ -65,6 +75,10 @@ public abstract class Action { value = StringUtils.trim(value); switch (actionName.toUpperCase()) { + case ACTION_ADD_CLASSIFICATION: + ret = new AddClassificationAction(actionValue); + break; + case ACTION_NAME_REPLACE_PREFIX: ret = new PrefixReplaceAction(key, actionValue); break; @@ -115,6 +129,60 @@ public abstract class Action { } } + public static class AddClassificationAction extends Action implements NeedsContext { + + private final String classificationName; + private TransformerContext transformerContext; + + public AddClassificationAction(String classificationName) { + super(ENTITY_KEY); + + this.classificationName = classificationName; + } + + @Override + public void apply(AtlasTransformableEntity transformableEntity) { + AtlasEntity entity = transformableEntity.entity; + if (entity.getClassifications() == null) { + entity.setClassifications(new ArrayList<AtlasClassification>()); + } + + for (AtlasClassification c : entity.getClassifications()) { + if (c.getTypeName().equals(classificationName)) { + return; + } + } + + entity.getClassifications().add(new AtlasClassification(classificationName)); + } + + @Override + public void setContext(TransformerContext transformerContext) { + this.transformerContext = transformerContext; + getCreateTag(classificationName); + } + + private void getCreateTag(String classificationName) { + if (transformerContext == null) { + return; + } + + try { + AtlasClassificationDef classificationDef = transformerContext.getTypeRegistry().getClassificationDefByName(classificationName); + if (classificationDef != null) { + return; + } + + classificationDef = new AtlasClassificationDef(classificationName); + AtlasTypesDef typesDef = new AtlasTypesDef(); + typesDef.setClassificationDefs(Collections.singletonList(classificationDef)); + transformerContext.getTypeDefStore().createTypesDef(typesDef); + LOG.info("created classification: {}", classificationName); + } catch (AtlasBaseException e) { + LOG.error("Error creating classification: {}", classificationName, e); + } + } + } public static class PrefixReplaceAction extends Action { private final String fromPrefix; http://git-wip-us.apache.org/repos/asf/atlas/blob/2111b95e/intg/src/main/java/org/apache/atlas/entitytransform/AtlasEntityTransformer.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/AtlasEntityTransformer.java b/intg/src/main/java/org/apache/atlas/entitytransform/AtlasEntityTransformer.java index c14f2fd..e9b2afd 100644 --- a/intg/src/main/java/org/apache/atlas/entitytransform/AtlasEntityTransformer.java +++ b/intg/src/main/java/org/apache/atlas/entitytransform/AtlasEntityTransformer.java @@ -19,14 +19,17 @@ package org.apache.atlas.entitytransform; import org.apache.atlas.entitytransform.BaseEntityHandler.AtlasTransformableEntity; import org.apache.atlas.model.impexp.AttributeTransform; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.type.AtlasType; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang.StringUtils; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; - - public class AtlasEntityTransformer { private final List<Condition> conditions; private final List<Action> actions; @@ -35,6 +38,10 @@ public class AtlasEntityTransformer { this(attributeTransform.getConditions(), attributeTransform.getAction()); } + public AtlasEntityTransformer(AtlasObjectId objectId, Map<String, String> actions) { + this(Collections.singletonMap("__entity", AtlasType.toJson(objectId)), actions); + } + public AtlasEntityTransformer(Map<String, String> conditions, Map<String, String> actions) { this.conditions = createConditions(conditions); this.actions = createActions(actions); http://git-wip-us.apache.org/repos/asf/atlas/blob/2111b95e/intg/src/main/java/org/apache/atlas/entitytransform/BaseEntityHandler.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/BaseEntityHandler.java b/intg/src/main/java/org/apache/atlas/entitytransform/BaseEntityHandler.java index 9d44043..dd6c665 100644 --- a/intg/src/main/java/org/apache/atlas/entitytransform/BaseEntityHandler.java +++ b/intg/src/main/java/org/apache/atlas/entitytransform/BaseEntityHandler.java @@ -17,9 +17,14 @@ */ package org.apache.atlas.entitytransform; +import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AttributeTransform; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +38,7 @@ public class BaseEntityHandler { protected final List<AtlasEntityTransformer> transformers; protected final boolean hasCustomAttributeTransformer; + private TransformerContext transformerContext; public BaseEntityHandler(List<AtlasEntityTransformer> transformers) { this(transformers, null); @@ -48,26 +54,45 @@ public class BaseEntityHandler { } public AtlasEntity transform(AtlasEntity entity) { - if (CollectionUtils.isNotEmpty(transformers)) { - AtlasTransformableEntity transformableEntity = getTransformableEntity(entity); + if (!CollectionUtils.isNotEmpty(transformers)) { + return entity; + } - if (transformableEntity != null) { - for (AtlasEntityTransformer transformer : transformers) { - transformer.transform(transformableEntity); - } + AtlasTransformableEntity transformableEntity = getTransformableEntity(entity); + if (transformableEntity == null) { + return entity; + } - transformableEntity.transformComplete(); - } + for (AtlasEntityTransformer transformer : transformers) { + transformer.transform(transformableEntity); } + transformableEntity.transformComplete(); + return entity; } + private void setContextForActions(List<Action> actions) { + for(Action action : actions) { + if (action instanceof NeedsContext) { + ((NeedsContext) action).setContext(transformerContext); + } + } + } + + private void setContextForConditions(List<Condition> conditions) { + for(Condition condition : conditions) { + if (condition instanceof NeedsContext) { + ((NeedsContext) condition).setContext(transformerContext); + } + } + } + public AtlasTransformableEntity getTransformableEntity(AtlasEntity entity) { return new AtlasTransformableEntity(entity); } - public static List<BaseEntityHandler> createEntityHandlers(List<AttributeTransform> transforms) { + public static List<BaseEntityHandler> createEntityHandlers(List<AttributeTransform> transforms, TransformerContext context) { if (LOG.isDebugEnabled()) { LOG.debug("==> BaseEntityHandler.createEntityHandlers(transforms={})", transforms); } @@ -92,10 +117,18 @@ public class BaseEntityHandler { for (BaseEntityHandler handler : handlers) { if (handler.hasCustomAttributeTransformer()) { ret.add(handler); + handler.setContext(context); } } if (CollectionUtils.isEmpty(ret)) { + BaseEntityHandler be = new BaseEntityHandler(transformers); + be.setContext(context); + + ret.add(be); + } + + if (CollectionUtils.isEmpty(ret)) { ret.add(new BaseEntityHandler(transformers)); } @@ -119,7 +152,20 @@ public class BaseEntityHandler { return false; } + public void setContext(AtlasTypeRegistry typeRegistry, AtlasTypeDefStore typeDefStore, AtlasExportRequest request) { + setContext(new TransformerContext(typeRegistry, typeDefStore, request)); + } + public void setContext(TransformerContext context) { + this.transformerContext = context; + + for (AtlasEntityTransformer transformer : transformers) { + if (transformerContext != null) { + setContextForActions(transformer.getActions()); + setContextForConditions(transformer.getConditions()); + } + } + } public static class AtlasTransformableEntity { protected final AtlasEntity entity; @@ -170,4 +216,41 @@ public class BaseEntityHandler { // implementations can override to set value of computed-attributes } } + + public static List<BaseEntityHandler> fromJson(String transformersString, TransformerContext context) { + if (StringUtils.isEmpty(transformersString)) { + return null; + } + + Object transformersObj = AtlasType.fromJson(transformersString, Object.class); + List transformers = (transformersObj != null && transformersObj instanceof List) ? (List) transformersObj : null; + + List<AttributeTransform> attributeTransforms = new ArrayList<>(); + + if (CollectionUtils.isEmpty(transformers)) { + return null; + } + + for (Object transformer : transformers) { + String transformerStr = AtlasType.toJson(transformer); + AttributeTransform attributeTransform = AtlasType.fromJson(transformerStr, AttributeTransform.class); + + if (attributeTransform == null) { + continue; + } + + attributeTransforms.add(attributeTransform); + } + + if (CollectionUtils.isEmpty(attributeTransforms)) { + return null; + } + + List<BaseEntityHandler> entityHandlers = createEntityHandlers(attributeTransforms, context); + if (CollectionUtils.isEmpty(entityHandlers)) { + return null; + } + + return entityHandlers; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/2111b95e/intg/src/main/java/org/apache/atlas/entitytransform/Condition.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/Condition.java b/intg/src/main/java/org/apache/atlas/entitytransform/Condition.java index bc63079..174b9b4 100644 --- a/intg/src/main/java/org/apache/atlas/entitytransform/Condition.java +++ b/intg/src/main/java/org/apache/atlas/entitytransform/Condition.java @@ -18,15 +18,25 @@ package org.apache.atlas.entitytransform; import org.apache.atlas.entitytransform.BaseEntityHandler.AtlasTransformableEntity; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + public abstract class Condition { private static final Logger LOG = LoggerFactory.getLogger(Condition.class); private static final String CONDITION_DELIMITER = ":"; + private static final String CONDITION_ENTITY_OBJECT_ID = "OBJECTID"; + private static final String CONDITION_ENTITY_TOP_LEVEL = "TOPLEVEL"; + private static final String CONDITION_ENTITY_ALL = "ALL"; private static final String CONDITION_NAME_EQUALS = "EQUALS"; private static final String CONDITION_NAME_EQUALS_IGNORE_CASE = "EQUALS_IGNORE_CASE"; private static final String CONDITION_NAME_STARTS_WITH = "STARTS_WITH"; @@ -60,6 +70,18 @@ public abstract class Condition { value = StringUtils.trim(value); switch (conditionName.toUpperCase()) { + case CONDITION_ENTITY_ALL: + ret = new ObjectIdEquals(key, CONDITION_ENTITY_ALL); + break; + + case CONDITION_ENTITY_TOP_LEVEL: + ret = new ObjectIdEquals(key, CONDITION_ENTITY_TOP_LEVEL); + break; + + case CONDITION_ENTITY_OBJECT_ID: + ret = new ObjectIdEquals(key, conditionValue); + break; + case CONDITION_NAME_EQUALS: ret = new EqualsCondition(key, conditionValue); break; @@ -164,6 +186,70 @@ public abstract class Condition { } } + static class ObjectIdEquals extends Condition implements NeedsContext { + private final List<AtlasObjectId> objectIds; + private String scope; + private TransformerContext transformerContext; + + public ObjectIdEquals(String key, String conditionValue) { + super(key); + + objectIds = new ArrayList<>(); + this.scope = conditionValue; + } + + @Override + public boolean matches(AtlasTransformableEntity entity) { + for (AtlasObjectId objectId : objectIds) { + return isMatch(objectId, entity.entity); + } + + return objectIds.size() == 0; + } + + public void add(AtlasObjectId objectId) { + this.objectIds.add(objectId); + } + + private boolean isMatch(AtlasObjectId objectId, AtlasEntity entity) { + boolean ret = true; + if (!StringUtils.isEmpty(objectId.getGuid())) { + return Objects.equals(objectId.getGuid(), entity.getGuid()); + } + + ret = Objects.equals(objectId.getTypeName(), entity.getTypeName()); + if (!ret) { + return ret; + } + + for (Map.Entry<String, Object> entry : objectId.getUniqueAttributes().entrySet()) { + ret = ret && Objects.equals(entity.getAttribute(entry.getKey()), entry.getValue()); + if (!ret) { + break; + } + } + + return ret; + } + + @Override + public void setContext(TransformerContext transformerContext) { + this.transformerContext = transformerContext; + if(StringUtils.isEmpty(scope) || scope.equals(CONDITION_ENTITY_ALL)) { + return; + } + + addObjectIdsFromExportRequest(); + } + + private void addObjectIdsFromExportRequest() { + for(AtlasObjectId objectId : this.transformerContext.getExportRequest().getItemsToExport()) { + add(objectId); + } + } + } + + public static class HasValueCondition extends Condition { protected final String attributeValue; http://git-wip-us.apache.org/repos/asf/atlas/blob/2111b95e/intg/src/main/java/org/apache/atlas/entitytransform/NeedsContext.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/NeedsContext.java b/intg/src/main/java/org/apache/atlas/entitytransform/NeedsContext.java new file mode 100644 index 0000000..5c16bcf --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/entitytransform/NeedsContext.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.entitytransform; + +public interface NeedsContext { + void setContext(TransformerContext transformerContext); +} http://git-wip-us.apache.org/repos/asf/atlas/blob/2111b95e/intg/src/main/java/org/apache/atlas/entitytransform/TransformerContext.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/TransformerContext.java b/intg/src/main/java/org/apache/atlas/entitytransform/TransformerContext.java new file mode 100644 index 0000000..a7a77b5 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/entitytransform/TransformerContext.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.entitytransform; + +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.store.AtlasTypeDefStore; +import org.apache.atlas.type.AtlasTypeRegistry; + +public class TransformerContext { + private final AtlasTypeRegistry typeRegistry; + private final AtlasTypeDefStore typeDefStore; + private final AtlasExportRequest exportRequest; + + public TransformerContext(AtlasTypeRegistry typeRegistry, AtlasTypeDefStore typeDefStore, AtlasExportRequest exportRequest) { + this.typeRegistry = typeRegistry; + this.typeDefStore = typeDefStore; + this.exportRequest = exportRequest; + } + + public AtlasTypeRegistry getTypeRegistry() { + return this.typeRegistry; + } + + public AtlasTypeDefStore getTypeDefStore() { + return typeDefStore; + } + + public AtlasExportRequest getExportRequest() { + return exportRequest; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/2111b95e/intg/src/test/java/org/apache/atlas/entitytransform/TransformationHandlerTest.java ---------------------------------------------------------------------- diff --git a/intg/src/test/java/org/apache/atlas/entitytransform/TransformationHandlerTest.java b/intg/src/test/java/org/apache/atlas/entitytransform/TransformationHandlerTest.java index a0ebe59..c76f959 100644 --- a/intg/src/test/java/org/apache/atlas/entitytransform/TransformationHandlerTest.java +++ b/intg/src/test/java/org/apache/atlas/entitytransform/TransformationHandlerTest.java @@ -19,8 +19,9 @@ package org.apache.atlas.entitytransform; import org.apache.atlas.model.impexp.AttributeTransform; import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasObjectId; +import org.apache.atlas.type.AtlasType; import org.apache.commons.lang.StringUtils; -import org.testng.Assert; import org.testng.annotations.Test; import java.util.ArrayList; @@ -30,6 +31,10 @@ import java.util.List; import java.util.Map; import static org.apache.atlas.entitytransform.TransformationConstants.HDFS_PATH; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import static org.apache.atlas.entitytransform.TransformationConstants.HIVE_TABLE; public class TransformationHandlerTest { @@ -50,9 +55,9 @@ public class TransformationHandlerTest { String transformedValue = (String) hdfsPath.getAttribute("qualifiedName"); if (endsWithCl1) { - Assert.assertTrue(transformedValue.endsWith("@cl2"), transformedValue + ": expected to end with @cl2"); + assertTrue(transformedValue.endsWith("@cl2"), transformedValue + ": expected to end with @cl2"); } else { - Assert.assertEquals(qualifiedName, transformedValue, "not expected to change"); + assertEquals(qualifiedName, transformedValue, "not expected to change"); } } } @@ -76,9 +81,9 @@ public class TransformationHandlerTest { String transformedValue = (String) hdfsPath.getAttribute("qualifiedName"); if (endsWithCl1) { - Assert.assertTrue(transformedValue.endsWith("@CL1"), transformedValue + ": expected to end with @CL1"); + assertTrue(transformedValue.endsWith("@CL1"), transformedValue + ": expected to end with @CL1"); } else { - Assert.assertEquals(qualifiedName, transformedValue, "not expected to change"); + assertEquals(qualifiedName, transformedValue, "not expected to change"); } } @@ -97,9 +102,9 @@ public class TransformationHandlerTest { String transformedValue = (String) hdfsPath.getAttribute("qualifiedName"); if (endsWithCL1) { - Assert.assertTrue(transformedValue.endsWith("@cl1"), transformedValue + ": expected to end with @cl1"); + assertTrue(transformedValue.endsWith("@cl1"), transformedValue + ": expected to end with @cl1"); } else { - Assert.assertEquals(qualifiedName, transformedValue, "not expected to change"); + assertEquals(qualifiedName, transformedValue, "not expected to change"); } } } @@ -118,7 +123,7 @@ public class TransformationHandlerTest { String replicatedTo = (String) entity.getAttribute("replicatedTo"); if (entity.getTypeName() == HIVE_TABLE) { - Assert.assertTrue(StringUtils.isNotEmpty(replicatedTo)); + assertTrue(StringUtils.isNotEmpty(replicatedTo)); } applyTransforms(entity, handlers); @@ -126,7 +131,7 @@ public class TransformationHandlerTest { String transformedValue = (String) entity.getAttribute("replicatedTo"); if (entity.getTypeName() == HIVE_TABLE) { - Assert.assertTrue(StringUtils.isEmpty(transformedValue)); + assertTrue(StringUtils.isEmpty(transformedValue)); } } } @@ -149,8 +154,8 @@ public class TransformationHandlerTest { String replicatedFrom = (String) entity.getAttribute("replicatedFrom"); if (entity.getTypeName() == HIVE_TABLE) { - Assert.assertTrue(StringUtils.isNotEmpty(replicatedTo)); - Assert.assertTrue(StringUtils.isNotEmpty(replicatedFrom)); + assertTrue(StringUtils.isNotEmpty(replicatedTo)); + assertTrue(StringUtils.isNotEmpty(replicatedFrom)); } applyTransforms(entity, handlers); @@ -159,8 +164,8 @@ public class TransformationHandlerTest { replicatedFrom = (String) entity.getAttribute("replicatedFrom"); if (entity.getTypeName() == HIVE_TABLE) { - Assert.assertTrue(StringUtils.isEmpty(replicatedTo)); - Assert.assertTrue(StringUtils.isEmpty(replicatedFrom)); + assertTrue(StringUtils.isEmpty(replicatedTo)); + assertTrue(StringUtils.isEmpty(replicatedFrom)); } } } @@ -182,8 +187,8 @@ public class TransformationHandlerTest { String replicatedFrom = (String) entity.getAttribute("replicatedFrom"); if (entity.getTypeName() == HIVE_TABLE) { - Assert.assertTrue(StringUtils.isNotEmpty(replicatedTo)); - Assert.assertTrue(StringUtils.isNotEmpty(replicatedFrom)); + assertTrue(StringUtils.isNotEmpty(replicatedTo)); + assertTrue(StringUtils.isNotEmpty(replicatedFrom)); } applyTransforms(entity, handlers); @@ -192,8 +197,8 @@ public class TransformationHandlerTest { replicatedFrom = (String) entity.getAttribute("replicatedFrom"); if (entity.getTypeName() == HIVE_TABLE) { - Assert.assertTrue(StringUtils.isEmpty(replicatedTo)); - Assert.assertTrue(StringUtils.isEmpty(replicatedFrom)); + assertTrue(StringUtils.isEmpty(replicatedTo)); + assertTrue(StringUtils.isEmpty(replicatedFrom)); } } } @@ -215,9 +220,9 @@ public class TransformationHandlerTest { String transformedValue = (String) hdfsPath.getAttribute("name"); if (startsWith_aa_bb_) { - Assert.assertTrue(transformedValue.startsWith("/xx/yy/"), transformedValue + ": expected to start with /xx/yy/"); + assertTrue(transformedValue.startsWith("/xx/yy/"), transformedValue + ": expected to start with /xx/yy/"); } else { - Assert.assertEquals(name, transformedValue, "not expected to change"); + assertEquals(name, transformedValue, "not expected to change"); } } } @@ -241,11 +246,11 @@ public class TransformationHandlerTest { String transformedValue = (String) entity.getAttribute("qualifiedName"); if (!isHdfsPath && endsWithCl1) { - Assert.assertTrue(transformedValue.endsWith("@cl1_backup"), transformedValue + ": expected to end with @cl1_backup"); + assertTrue(transformedValue.endsWith("@cl1_backup"), transformedValue + ": expected to end with @cl1_backup"); } else if (!isHdfsPath && containsCl1) { - Assert.assertTrue(transformedValue.contains("@cl1_backup"), transformedValue + ": expected to contains @cl1_backup"); + assertTrue(transformedValue.contains("@cl1_backup"), transformedValue + ": expected to contains @cl1_backup"); } else { - Assert.assertEquals(qualifiedName, transformedValue, "not expected to change"); + assertEquals(qualifiedName, transformedValue, "not expected to change"); } } } @@ -266,11 +271,11 @@ public class TransformationHandlerTest { applyTransforms(entity, handlers); if (startsWithHrDot) { - Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr_backup.")); + assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr_backup.")); } else if (startsWithHrAt) { - Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr_backup@")); + assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr_backup@")); } else { - Assert.assertEquals(qualifiedName, (String) entity.getAttribute("qualifiedName"), "not expected to change"); + assertEquals(qualifiedName, (String) entity.getAttribute("qualifiedName"), "not expected to change"); } } } @@ -293,11 +298,11 @@ public class TransformationHandlerTest { applyTransforms(entity, handlers); if (startsWithHrEmployeesDot) { - Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr.employees_backup.")); + assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr.employees_backup.")); } else if (startsWithHrEmployeesAt) { - Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr.employees_backup@")); + assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr.employees_backup@")); } else { - Assert.assertEquals(qualifiedName, (String) entity.getAttribute("qualifiedName"), "not expected to change"); + assertEquals(qualifiedName, (String) entity.getAttribute("qualifiedName"), "not expected to change"); } } } @@ -320,15 +325,56 @@ public class TransformationHandlerTest { applyTransforms(entity, handlers); if (startsWithHrEmployeesAgeAt) { - Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr.employees.age_backup@")); + assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr.employees.age_backup@")); } else { - Assert.assertEquals(qualifiedName, (String) entity.getAttribute("qualifiedName"), "not expected to change"); + assertEquals(qualifiedName, (String) entity.getAttribute("qualifiedName"), "not expected to change"); + } + } + } + + @Test + public void verifyAddClassification() { + AtlasEntityTransformer entityTransformer = new AtlasEntityTransformer( + Collections.singletonMap("hdfs_path.qualifiedName", "EQUALS: hr@cl1"), + Collections.singletonMap("__entity", "addClassification: replicated") + ); + + List<BaseEntityHandler> handlers = new ArrayList<>(); + handlers.add(new BaseEntityHandler(Collections.singletonList(entityTransformer))); + assertApplyTransform(handlers); + } + + @Test + public void verifyAddClassificationUsingScope() { + AtlasObjectId objectId = new AtlasObjectId("hive_db", Collections.singletonMap("qualifiedName", "hr@cl1")); + AtlasEntityTransformer entityTransformer = new AtlasEntityTransformer( + Collections.singletonMap("__entity", "topLevel: "), + Collections.singletonMap("__entity", "addClassification: replicated") + ); + + List<BaseEntityHandler> handlers = new ArrayList<>(); + handlers.add(new BaseEntityHandler(Collections.singletonList(entityTransformer))); + Condition condition = handlers.get(0).transformers.get(0).getConditions().get(0); + Condition.ObjectIdEquals objectIdEquals = (Condition.ObjectIdEquals) condition; + objectIdEquals.add(objectId); + + assertApplyTransform(handlers); + } + + private void assertApplyTransform(List<BaseEntityHandler> handlers) { + for (AtlasEntity entity : getAllEntities()) { + applyTransforms(entity, handlers); + + if(entity.getAttribute("qualifiedName").equals("hr@cl1")) { + assertNotNull(entity.getClassifications()); + } else{ + assertNull(entity.getClassifications()); } } } private List<BaseEntityHandler> initializeHandlers(List<AttributeTransform> params) { - return BaseEntityHandler.createEntityHandlers(params); + return BaseEntityHandler.createEntityHandlers(params, null); } private void applyTransforms(AtlasEntity entity, List<BaseEntityHandler> handlers) { @@ -425,10 +471,12 @@ public class TransformationHandlerTest { } private AtlasEntity getHiveTableEntity(String clusterName, String dbName, String tableName) { + String qualifiedName = dbName + "." + tableName + "@" + clusterName; + AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_TABLE); entity.setAttribute("name", tableName); - entity.setAttribute("qualifiedName", dbName + "." + tableName + "@" + clusterName); + entity.setAttribute("qualifiedName", qualifiedName); entity.setAttribute("owner", "hive"); entity.setAttribute("temporary", false); entity.setAttribute("lastAccessTime", "1535656355000"); @@ -442,11 +490,13 @@ public class TransformationHandlerTest { } private AtlasEntity getHiveStorageDescriptorEntity(String clusterName, String dbName, String tableName) { + String qualifiedName = "hdfs://localhost.localdomain:8020/warehouse/tablespace/managed/hive/" + dbName + ".db" + "/" + tableName; + AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_STORAGE_DESCRIPTOR); entity.setAttribute("qualifiedName", dbName + "." + tableName + "@" + clusterName + "_storage"); entity.setAttribute("storedAsSubDirectories", false); - entity.setAttribute("location", "hdfs://localhost.localdomain:8020/warehouse/tablespace/managed/hive/" + dbName + ".db" + "/" + tableName); + entity.setAttribute("location", qualifiedName); entity.setAttribute("compressed", false); entity.setAttribute("inputFormat", "org.apache.hadoop.mapred.TextInputFormat"); entity.setAttribute("outputFormat", "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"); @@ -456,10 +506,12 @@ public class TransformationHandlerTest { } private AtlasEntity getHiveColumnEntity(String clusterName, String dbName, String tableName, String columnName) { + String qualifiedName = dbName + "." + tableName + "." + columnName + "@" + clusterName; + AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_COLUMN); entity.setAttribute("owner", "hive"); - entity.setAttribute("qualifiedName", dbName + "." + tableName + "." + columnName +"@" + clusterName); + entity.setAttribute("qualifiedName", qualifiedName); entity.setAttribute("name", columnName); entity.setAttribute("position", 1); entity.setAttribute("type", "string"); http://git-wip-us.apache.org/repos/asf/atlas/blob/2111b95e/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index a09385e..b5d8b7c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -20,10 +20,10 @@ package org.apache.atlas.repository.impexp; import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.entitytransform.BaseEntityHandler; +import org.apache.atlas.entitytransform.TransformerContext; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; -import org.apache.atlas.model.impexp.AttributeTransform; import org.apache.atlas.model.typedef.AtlasTypesDef; import org.apache.atlas.repository.store.graph.BulkImporter; import org.apache.atlas.store.AtlasTypeDefStore; @@ -42,7 +42,6 @@ import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import static org.apache.atlas.model.impexp.AtlasImportRequest.TRANSFORMERS_KEY; @@ -131,36 +130,19 @@ public class ImportService { } - private void setEntityTransformerHandlers(ZipSource source, String transformersString) { - if (StringUtils.isEmpty(transformersString)) { + @VisibleForTesting + void setEntityTransformerHandlers(ZipSource source, String transformersJson) throws AtlasBaseException { + if (StringUtils.isEmpty(transformersJson)) { return; } - Object transformersObj = AtlasType.fromJson(transformersString, Object.class); - List transformers = (transformersObj != null && transformersObj instanceof List) ? (List) transformersObj : null; - - List<AttributeTransform> attributeTransforms = new ArrayList<>(); - - if (CollectionUtils.isNotEmpty(transformers)) { - for (Object transformer : transformers) { - String transformerStr = AtlasType.toJson(transformer); - AttributeTransform attributeTransform = AtlasType.fromJson(transformerStr, AttributeTransform.class); - - if (attributeTransform == null) { - continue; - } - - attributeTransforms.add(attributeTransform); - } + TransformerContext context = new TransformerContext(typeRegistry, typeDefStore, source.getExportResult().getRequest()); + List<BaseEntityHandler> entityHandlers = BaseEntityHandler.fromJson(transformersJson, context); + if (CollectionUtils.isEmpty(entityHandlers)) { + return; } - if (CollectionUtils.isNotEmpty(attributeTransforms)) { - List<BaseEntityHandler> entityHandlers = BaseEntityHandler.createEntityHandlers(attributeTransforms); - - if (CollectionUtils.isNotEmpty(entityHandlers)) { - source.setEntityHandlers(entityHandlers); - } - } + source.setEntityHandlers(entityHandlers); } private void debugLog(String s, Object... params) { http://git-wip-us.apache.org/repos/asf/atlas/blob/2111b95e/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java index 1f436ce..a292b96 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ZipSource.java @@ -138,10 +138,12 @@ public class ZipSource implements EntityImportStream { String s = getFromCache(guid); AtlasEntityWithExtInfo entityWithExtInfo = convertFromJson(AtlasEntityWithExtInfo.class, s); + if (importTransform != null) { + entityWithExtInfo = importTransform.apply(entityWithExtInfo); + } + if (entityHandlers != null) { applyTransformers(entityWithExtInfo); - } else if (importTransform != null) { - entityWithExtInfo = importTransform.apply(entityWithExtInfo); } return entityWithExtInfo;