ATLAS-2873: Atlas Import Transform Handler Implementation
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/dde93556 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/dde93556 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/dde93556 Branch: refs/heads/branch-1.0 Commit: dde93556068bc17472f8594d4383ed409af413b1 Parents: 558fe96 Author: Sarath Subramanian <ssubraman...@hortonworks.com> Authored: Mon Sep 17 06:18:07 2018 -0700 Committer: Ashutosh Mestry <ames...@hortonworks.com> Committed: Thu Nov 1 15:42:55 2018 -0700 ---------------------------------------------------------------------- .../apache/atlas/entitytransform/Action.java | 199 ++++++++++ .../entitytransform/AtlasEntityTransformer.java | 94 +++++ .../entitytransform/BaseEntityHandler.java | 165 +++++++++ .../apache/atlas/entitytransform/Condition.java | 161 ++++++++ .../entitytransform/HdfsPathEntityHandler.java | 170 +++++++++ .../HiveColumnEntityHandler.java | 139 +++++++ .../HiveDatabaseEntityHandler.java | 113 ++++++ .../HiveStorageDescriptorEntityHandler.java | 143 +++++++ .../entitytransform/HiveTableEntityHandler.java | 127 +++++++ .../TransformationConstants.java | 48 +++ .../atlas/model/impexp/AttributeTransform.java | 87 +++++ .../TransformationHandlerTest.java | 370 +++++++++++++++++++ .../repository/impexp/ImportTransformer.java | 1 - 13 files changed, 1816 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/dde93556/intg/src/main/java/org/apache/atlas/entitytransform/Action.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/Action.java b/intg/src/main/java/org/apache/atlas/entitytransform/Action.java new file mode 100644 index 0000000..ca5f3a8 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/entitytransform/Action.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.entitytransform; + +import org.apache.commons.lang.StringUtils; +import org.apache.atlas.entitytransform.BaseEntityHandler.AtlasTransformableEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class Action { + private static final Logger LOG = LoggerFactory.getLogger(Action.class); + + private static final String ACTION_DELIMITER = ":"; + private static final String ACTION_NAME_SET = "SET"; + private static final String ACTION_NAME_REPLACE_PREFIX = "REPLACE_PREFIX"; + private static final String ACTION_NAME_TO_LOWER = "TO_LOWER"; + private static final String ACTION_NAME_TO_UPPER = "TO_UPPER"; + + protected final String attributeName; + + + protected Action(String attributeName) { + this.attributeName = attributeName; + } + + public String getAttributeName() { return attributeName; } + + public boolean isValid() { + return StringUtils.isNotEmpty(attributeName); + } + + public abstract void apply(AtlasTransformableEntity entity); + + + public static Action createAction(String key, String value) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> Action.createAction(key={}, value={})", key, value); + } + + final Action ret; + + int idxActionDelim = value == null ? -1 : value.indexOf(ACTION_DELIMITER); + String actionName = idxActionDelim == -1 ? ACTION_NAME_SET : value.substring(0, idxActionDelim); + String actionValue = idxActionDelim == -1 ? value : value.substring(idxActionDelim + ACTION_DELIMITER.length()); + + actionName = StringUtils.trim(actionName); + actionValue = StringUtils.trim(actionValue); + value = StringUtils.trim(value); + + switch (actionName.toUpperCase()) { + case ACTION_NAME_REPLACE_PREFIX: + ret = new PrefixReplaceAction(key, actionValue); + break; + + case ACTION_NAME_TO_LOWER: + ret = new ToLowerCaseAction(key); + break; + + case ACTION_NAME_TO_UPPER: + ret = new ToUpperCaseAction(key); + break; + + case ACTION_NAME_SET: + ret = new SetAction(key, actionValue); + break; + + default: + ret = new SetAction(key, value); // treat unspecified/unknown action as 'SET' + break; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== Action.createAction(key={}, value={}): actionName={}, actionValue={}, ret={}", key, value, actionName, actionValue, ret); + } + + return ret; + } + + + public static class SetAction extends Action { + private final String attributeValue; + + public SetAction(String attributeName, String attributeValue) { + super(attributeName); + + this.attributeValue = attributeValue; + } + + @Override + public void apply(AtlasTransformableEntity entity) { + if (isValid()) { + entity.setAttribute(attributeName, attributeValue); + } + } + } + + + public static class PrefixReplaceAction extends Action { + private final String fromPrefix; + private final String toPrefix; + + public PrefixReplaceAction(String attributeName, String actionValue) { + super(attributeName); + + // actionValue => =:prefixToReplace=replacedValue + if (actionValue != null) { + int idxSepDelimiter = actionValue.indexOf(ACTION_DELIMITER); + + if (idxSepDelimiter == -1) { // no separator specified i.e. no value specified to replace; remove the prefix + fromPrefix = actionValue; + toPrefix = ""; + } else { + String prefixSep = StringUtils.trim(actionValue.substring(0, idxSepDelimiter)); + int idxPrefixSep = actionValue.indexOf(prefixSep, idxSepDelimiter + ACTION_DELIMITER.length()); + + if (idxPrefixSep == -1) { // separator not found i.e. no value specified to replace; remove the prefix + fromPrefix = actionValue.substring(idxSepDelimiter + ACTION_DELIMITER.length()); + toPrefix = ""; + } else { + fromPrefix = actionValue.substring(idxSepDelimiter + ACTION_DELIMITER.length(), idxPrefixSep); + toPrefix = actionValue.substring(idxPrefixSep + prefixSep.length()); + } + } + } else { + fromPrefix = null; + toPrefix = ""; + } + } + + @Override + public boolean isValid() { + return super.isValid() && StringUtils.isNotEmpty(fromPrefix); + } + + @Override + public void apply(AtlasTransformableEntity entity) { + if (isValid()) { + Object currValue = entity.getAttribute(attributeName); + String strValue = currValue != null ? currValue.toString() : null; + + if (strValue != null && strValue.startsWith(fromPrefix)) { + entity.setAttribute(attributeName, StringUtils.replace(strValue, fromPrefix, toPrefix, 1)); + } + } + } + } + + public static class ToLowerCaseAction extends Action { + public ToLowerCaseAction(String attributeName) { + super(attributeName); + } + + @Override + public void apply(AtlasTransformableEntity entity) { + if (isValid()) { + Object currValue = entity.getAttribute(attributeName); + String strValue = currValue instanceof String ? (String) currValue : null; + + if (strValue != null) { + entity.setAttribute(attributeName, strValue.toLowerCase()); + } + } + } + } + + public static class ToUpperCaseAction extends Action { + public ToUpperCaseAction(String attributeName) { + super(attributeName); + } + + @Override + public void apply(AtlasTransformableEntity entity) { + if (isValid()) { + Object currValue = entity.getAttribute(attributeName); + String strValue = currValue instanceof String ? (String) currValue : null; + + if (strValue != null) { + entity.setAttribute(attributeName, strValue.toUpperCase()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/dde93556/intg/src/main/java/org/apache/atlas/entitytransform/AtlasEntityTransformer.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/AtlasEntityTransformer.java b/intg/src/main/java/org/apache/atlas/entitytransform/AtlasEntityTransformer.java new file mode 100644 index 0000000..c14f2fd --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/entitytransform/AtlasEntityTransformer.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.entitytransform; + +import org.apache.atlas.entitytransform.BaseEntityHandler.AtlasTransformableEntity; +import org.apache.atlas.model.impexp.AttributeTransform; +import org.apache.commons.collections.MapUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + + +public class AtlasEntityTransformer { + private final List<Condition> conditions; + private final List<Action> actions; + + public AtlasEntityTransformer(AttributeTransform attributeTransform) { + this(attributeTransform.getConditions(), attributeTransform.getAction()); + } + + public AtlasEntityTransformer(Map<String, String> conditions, Map<String, String> actions) { + this.conditions = createConditions(conditions); + this.actions = createActions(actions); + } + + public List<Condition> getConditions() { + return conditions; + } + + public List<Action> getActions() { + return actions; + } + + public void transform(AtlasTransformableEntity entity) { + if (entity != null) { + boolean matches = true; + + for (Condition condition : conditions) { + matches = matches && condition.matches(entity); + } + + if (matches) { + for (Action action : actions) { + action.apply(entity); + } + } + } + } + + private List<Condition> createConditions(Map<String, String> conditions) { + List<Condition> ret = new ArrayList<>(); + + if (MapUtils.isNotEmpty(conditions)) { + for (Map.Entry<String, String> entry : conditions.entrySet()) { + Condition condition = Condition.createCondition(entry.getKey(), entry.getValue()); + + ret.add(condition); + } + } + + return ret; + } + + private List<Action> createActions(Map<String, String> actions) { + List<Action> ret = new ArrayList<>(); + + if (MapUtils.isNotEmpty(actions)) { + for (Map.Entry<String, String> entry : actions.entrySet()) { + Action action = Action.createAction(entry.getKey(), entry.getValue()); + + ret.add(action); + } + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/dde93556/intg/src/main/java/org/apache/atlas/entitytransform/BaseEntityHandler.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/BaseEntityHandler.java b/intg/src/main/java/org/apache/atlas/entitytransform/BaseEntityHandler.java new file mode 100644 index 0000000..c1f2869 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/entitytransform/BaseEntityHandler.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.entitytransform; + +import org.apache.atlas.model.impexp.AttributeTransform; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.atlas.entitytransform.TransformationConstants.TYPE_NAME_ATTRIBUTE_NAME_SEP; + +public class BaseEntityHandler { + private static final Logger LOG = LoggerFactory.getLogger(BaseEntityHandler.class); + + protected final List<AtlasEntityTransformer> transformers; + protected final boolean hasCustomAttributeTransformer; + + public BaseEntityHandler(List<AtlasEntityTransformer> transformers) { + this(transformers, null); + } + + public BaseEntityHandler(List<AtlasEntityTransformer> transformers, List<String> customTransformAttributes) { + this.transformers = transformers; + this.hasCustomAttributeTransformer = hasTransformerForAnyAttribute(customTransformAttributes); + } + + public boolean hasCustomAttributeTransformer() { + return hasCustomAttributeTransformer; + } + + public AtlasEntity transform(AtlasEntity entity) { + if (CollectionUtils.isNotEmpty(transformers)) { + AtlasTransformableEntity transformableEntity = getTransformableEntity(entity); + + if (transformableEntity != null) { + for (AtlasEntityTransformer transformer : transformers) { + transformer.transform(transformableEntity); + } + + transformableEntity.transformComplete(); + } + } + + return entity; + } + + public AtlasTransformableEntity getTransformableEntity(AtlasEntity entity) { + return new AtlasTransformableEntity(entity); + } + + public static List<BaseEntityHandler> createEntityHandlers(List<AttributeTransform> transforms) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> BaseEntityHandler.createEntityHandlers(transforms={})", transforms); + } + + List<AtlasEntityTransformer> transformers = new ArrayList<>(); + + for (AttributeTransform transform : transforms) { + transformers.add(new AtlasEntityTransformer(transform)); + } + + BaseEntityHandler[] handlers = new BaseEntityHandler[] { + new HdfsPathEntityHandler(transformers), + new HiveDatabaseEntityHandler(transformers), + new HiveTableEntityHandler(transformers), + new HiveColumnEntityHandler(transformers), + new HiveStorageDescriptorEntityHandler(transformers) + }; + + List<BaseEntityHandler> ret = new ArrayList<>(); + + // include customer handlers, only if its customer attribute is transformed + for (BaseEntityHandler handler : handlers) { + if (handler.hasCustomAttributeTransformer()) { + ret.add(handler); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== BaseEntityHandler.createEntityHandlers(transforms={}): ret.size={}", transforms, ret.size()); + } + + return ret; + } + + private boolean hasTransformerForAnyAttribute(List<String> attributes) { + if (CollectionUtils.isNotEmpty(transformers) && CollectionUtils.isNotEmpty(attributes)) { + for (AtlasEntityTransformer transformer : transformers) { + for (Action action : transformer.getActions()) { + if (attributes.contains(action.getAttributeName())) { + return true; + } + } + } + } + + return false; + } + + + public static class AtlasTransformableEntity { + protected final AtlasEntity entity; + + protected AtlasTransformableEntity(AtlasEntity entity) { + this.entity = entity; + } + + public AtlasEntity getEntity() { + return entity; + } + + public Object getAttribute(String attributeName) { + Object ret = null; + + if (entity != null && attributeName != null) { + ret = entity.getAttribute(attributeName); + + if (ret == null) { // try after dropping typeName prefix, if attributeName contains it + int idxSep = attributeName.indexOf(TYPE_NAME_ATTRIBUTE_NAME_SEP); + + if (idxSep != -1) { + ret = entity.getAttribute(attributeName.substring(idxSep + 1)); + } + } + } + + return ret; + } + + public void setAttribute(String attributeName, String attributeValue) { + if (entity != null && attributeName != null) { + int idxSep = attributeName.indexOf(TYPE_NAME_ATTRIBUTE_NAME_SEP); // drop typeName prefix, if attributeName contains it + + if (idxSep != -1) { + entity.setAttribute(attributeName.substring(idxSep + 1), attributeValue); + } else { + entity.setAttribute(attributeName, attributeValue); + } + } + } + + public void transformComplete() { + // implementations can override to set value of computed-attributes + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/dde93556/intg/src/main/java/org/apache/atlas/entitytransform/Condition.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/Condition.java b/intg/src/main/java/org/apache/atlas/entitytransform/Condition.java new file mode 100644 index 0000000..d44f575 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/entitytransform/Condition.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.entitytransform; + +import org.apache.atlas.entitytransform.BaseEntityHandler.AtlasTransformableEntity; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public abstract class Condition { + private static final Logger LOG = LoggerFactory.getLogger(Condition.class); + + private static final String CONDITION_DELIMITER = ":"; + private static final String CONDITION_NAME_EQUALS = "EQUALS"; + private static final String CONDITION_NAME_EQUALS_IGNORE_CASE = "EQUALS_IGNORE_CASE"; + private static final String CONDITION_NAME_STARTS_WITH = "STARTS_WITH"; + private static final String CONDITION_NAME_STARTS_WITH_IGNORE_CASE = "STARTS_WITH_IGNORE_CASE"; + + protected final String attributeName; + + protected Condition(String attributeName) { + this.attributeName = attributeName; + } + + public String getAttributeName() { return attributeName; } + + public abstract boolean matches(AtlasTransformableEntity entity); + + + public static Condition createCondition(String key, String value) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> Condition.createCondition(key={}, value={})", key, value); + } + + final Condition ret; + + int idxConditionDelim = value == null ? -1 : value.indexOf(CONDITION_DELIMITER); + String conditionName = idxConditionDelim == -1 ? CONDITION_NAME_EQUALS : value.substring(0, idxConditionDelim); + String conditionValue = idxConditionDelim == -1 ? value : value.substring(idxConditionDelim + CONDITION_DELIMITER.length()); + + conditionName = StringUtils.trim(conditionName); + conditionValue = StringUtils.trim(conditionValue); + value = StringUtils.trim(value); + + switch (conditionName.toUpperCase()) { + case CONDITION_NAME_EQUALS: + ret = new EqualsCondition(key, conditionValue); + break; + + case CONDITION_NAME_EQUALS_IGNORE_CASE: + ret = new EqualsIgnoreCaseCondition(key, conditionValue); + break; + + case CONDITION_NAME_STARTS_WITH: + ret = new StartsWithCondition(key, conditionValue); + break; + + case CONDITION_NAME_STARTS_WITH_IGNORE_CASE: + ret = new StartsWithIgnoreCaseCondition(key, conditionValue); + break; + + default: + ret = new EqualsCondition(key, value); // treat unspecified/unknown condition as 'EQUALS' + break; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== Condition.createCondition(key={}, value={}): actionName={}, actionValue={}, ret={}", key, value, conditionName, conditionValue, ret); + } + + return ret; + } + + + public static class EqualsCondition extends Condition { + protected final String attributeValue; + + public EqualsCondition(String attributeName, String attributeValue) { + super(attributeName); + + this.attributeValue = attributeValue; + } + + @Override + public boolean matches(AtlasTransformableEntity entity) { + Object attributeValue = entity != null ? entity.getAttribute(attributeName) : null; + + return attributeValue != null && StringUtils.equals(attributeValue.toString(), this.attributeValue); + } + } + + + public static class EqualsIgnoreCaseCondition extends Condition { + protected final String attributeValue; + + public EqualsIgnoreCaseCondition(String attributeName, String attributeValue) { + super(attributeName); + + this.attributeValue = attributeValue; + } + + @Override + public boolean matches(AtlasTransformableEntity entity) { + Object attributeValue = entity != null ? entity.getAttribute(attributeName) : null; + + return attributeValue != null && StringUtils.equalsIgnoreCase(attributeValue.toString(), this.attributeValue); + } + } + + + public static class StartsWithCondition extends Condition { + protected final String prefix; + + public StartsWithCondition(String attributeName, String prefix) { + super(attributeName); + + this.prefix = prefix; + } + + @Override + public boolean matches(AtlasTransformableEntity entity) { + Object attributeValue = entity != null ? entity.getAttribute(attributeName) : null; + + return attributeValue != null && StringUtils.startsWith(attributeValue.toString(), this.prefix); + } + } + + + public static class StartsWithIgnoreCaseCondition extends Condition { + protected final String prefix; + + public StartsWithIgnoreCaseCondition(String attributeName, String prefix) { + super(attributeName); + + this.prefix = prefix; + } + + @Override + public boolean matches(AtlasTransformableEntity entity) { + Object attributeValue = entity != null ? entity.getAttribute(attributeName) : null; + + return attributeValue != null && StringUtils.startsWithIgnoreCase(attributeValue.toString(), this.prefix); + } + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/dde93556/intg/src/main/java/org/apache/atlas/entitytransform/HdfsPathEntityHandler.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/HdfsPathEntityHandler.java b/intg/src/main/java/org/apache/atlas/entitytransform/HdfsPathEntityHandler.java new file mode 100644 index 0000000..1a398ea --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/entitytransform/HdfsPathEntityHandler.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.entitytransform; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.commons.lang.StringUtils; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.atlas.entitytransform.TransformationConstants.CLUSTER_DELIMITER; +import static org.apache.atlas.entitytransform.TransformationConstants.CLUSTER_NAME_ATTRIBUTE; +import static org.apache.atlas.entitytransform.TransformationConstants.HDFS_CLUSTER_NAME_ATTRIBUTE; +import static org.apache.atlas.entitytransform.TransformationConstants.HDFS_PATH; +import static org.apache.atlas.entitytransform.TransformationConstants.HDFS_PATH_PATH_ATTRIBUTE; +import static org.apache.atlas.entitytransform.TransformationConstants.HDFS_PATH_NAME_ATTRIBUTE; +import static org.apache.atlas.entitytransform.TransformationConstants.NAME_ATTRIBUTE; +import static org.apache.atlas.entitytransform.TransformationConstants.PATH_ATTRIBUTE; +import static org.apache.atlas.entitytransform.TransformationConstants.QUALIFIED_NAME_ATTRIBUTE; + + +public class HdfsPathEntityHandler extends BaseEntityHandler { + private static final List<String> CUSTOM_TRANSFORM_ATTRIBUTES = Arrays.asList(HDFS_PATH_NAME_ATTRIBUTE, HDFS_PATH_PATH_ATTRIBUTE, HDFS_CLUSTER_NAME_ATTRIBUTE); + + public HdfsPathEntityHandler(List<AtlasEntityTransformer> transformers) { + super(transformers, CUSTOM_TRANSFORM_ATTRIBUTES); + } + + @Override + public AtlasTransformableEntity getTransformableEntity(AtlasEntity entity) { + return isHdfsPathEntity(entity) ? new HdfsPathEntity(entity) : null; + } + + private boolean isHdfsPathEntity(AtlasEntity entity) { + return StringUtils.equals(entity.getTypeName(), HDFS_PATH); + } + + + public static class HdfsPathEntity extends AtlasTransformableEntity { + private String clusterName; + private String path; + private String name; + private String pathPrefix; + private boolean isPathUpdated = false; + private boolean isCustomerAttributeUpdated = false; + + + public HdfsPathEntity(AtlasEntity entity) { + super(entity); + + this.path = (String) entity.getAttribute(PATH_ATTRIBUTE); + this.name = (String) entity.getAttribute(NAME_ATTRIBUTE); + + String qualifiedName = (String) entity.getAttribute(QUALIFIED_NAME_ATTRIBUTE); + + if (qualifiedName != null) { + int clusterSeparatorIdx = qualifiedName.lastIndexOf(CLUSTER_DELIMITER); + + if (clusterSeparatorIdx != -1) { + this.clusterName = qualifiedName.substring(clusterSeparatorIdx + 1); + } else { + this.clusterName = ""; + } + + if (StringUtils.isNotEmpty(path) && StringUtils.isNotEmpty(name)) { + int idx = path.indexOf(name); + + if (idx != -1) { + this.pathPrefix = path.substring(0, idx); + } else { + this.pathPrefix = ""; + } + } + } else { + this.clusterName = ""; + this.pathPrefix = ""; + } + } + + @Override + public Object getAttribute(String attributeName) { + switch (attributeName) { + case HDFS_CLUSTER_NAME_ATTRIBUTE: + return clusterName; + + case HDFS_PATH_NAME_ATTRIBUTE: + return name; + + case HDFS_PATH_PATH_ATTRIBUTE: + return path; + } + + return super.getAttribute(attributeName); + } + + @Override + public void setAttribute(String attributeName, String attributeValue) { + switch (attributeName) { + case HDFS_CLUSTER_NAME_ATTRIBUTE: + clusterName = attributeValue; + + isCustomerAttributeUpdated = true; + break; + + case HDFS_PATH_NAME_ATTRIBUTE: + name = attributeValue; + + isCustomerAttributeUpdated = true; + break; + + case HDFS_PATH_PATH_ATTRIBUTE: + path = attributeValue; + + isPathUpdated = true; + isCustomerAttributeUpdated = true; + break; + + default: + super.setAttribute(attributeName, attributeValue); + break; + } + } + + @Override + public void transformComplete() { + if (isCustomerAttributeUpdated) { + entity.setAttribute(CLUSTER_NAME_ATTRIBUTE, clusterName); + entity.setAttribute(NAME_ATTRIBUTE, name); + entity.setAttribute(PATH_ATTRIBUTE, toPath()); + entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, toQualifiedName()); + } + } + + + private String toQualifiedName() { + return StringUtils.isEmpty(clusterName) ? toPath() : String.format("%s@%s", toPath(), clusterName); + } + + private String toPath() { + final String ret; + + if (isPathUpdated) { + ret = path; + } else { + if (StringUtils.isNotEmpty(pathPrefix)) { + ret = pathPrefix + name; + } else { + ret = name; + } + } + + return ret; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/dde93556/intg/src/main/java/org/apache/atlas/entitytransform/HiveColumnEntityHandler.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/HiveColumnEntityHandler.java b/intg/src/main/java/org/apache/atlas/entitytransform/HiveColumnEntityHandler.java new file mode 100644 index 0000000..fca94b6 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/entitytransform/HiveColumnEntityHandler.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.entitytransform; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.commons.lang.StringUtils; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.atlas.entitytransform.TransformationConstants.*; + + +public class HiveColumnEntityHandler extends BaseEntityHandler { + private static final List<String> CUSTOM_TRANSFORM_ATTRIBUTES = Arrays.asList(HIVE_DB_NAME_ATTRIBUTE, HIVE_TABLE_NAME_ATTRIBUTE, HIVE_COLUMN_NAME_ATTRIBUTE, HIVE_DB_CLUSTER_NAME_ATTRIBUTE); + + public HiveColumnEntityHandler(List<AtlasEntityTransformer> transformers) { + super(transformers, CUSTOM_TRANSFORM_ATTRIBUTES); + } + + @Override + public AtlasTransformableEntity getTransformableEntity(AtlasEntity entity) { + return isHiveColumnEntity(entity) ? new HiveColumnEntity(entity) : null; + } + + private boolean isHiveColumnEntity(AtlasEntity entity) { + return StringUtils.equals(entity.getTypeName(), HIVE_COLUMN); + } + + + public static class HiveColumnEntity extends AtlasTransformableEntity { + private String databaseName; + private String tableName; + private String columnName; + private String clusterName; + private boolean isCustomerAttributeUpdated = false; + + public HiveColumnEntity(AtlasEntity entity) { + super(entity); + + this.columnName = (String) entity.getAttribute(NAME_ATTRIBUTE); + + String qualifiedName = (String) entity.getAttribute(QUALIFIED_NAME_ATTRIBUTE); + + if (qualifiedName != null) { + int databaseSeparatorIdx = qualifiedName.indexOf(DATABASE_DELIMITER); + int tableSeparatorIdx = databaseSeparatorIdx != -1 ? qualifiedName.indexOf(DATABASE_DELIMITER, databaseSeparatorIdx + 1) : - 1; + int clusterSeparatorIdx = qualifiedName.lastIndexOf(CLUSTER_DELIMITER); + + this.databaseName = (databaseSeparatorIdx != -1) ? qualifiedName.substring(0, databaseSeparatorIdx).trim() : ""; + this.tableName = (tableSeparatorIdx != -1) ? qualifiedName.substring(databaseSeparatorIdx + 1, tableSeparatorIdx).trim() : ""; + this.clusterName = (clusterSeparatorIdx != -1) ? qualifiedName.substring(clusterSeparatorIdx + 1).trim() : ""; + } else { + this.databaseName = ""; + this.tableName = ""; + this.clusterName = ""; + } + } + + @Override + public Object getAttribute(String attributeName) { + switch (attributeName) { + case HIVE_DB_NAME_ATTRIBUTE: + return databaseName; + + case HIVE_TABLE_NAME_ATTRIBUTE: + return tableName; + + case HIVE_COLUMN_NAME_ATTRIBUTE: + return columnName; + + case HIVE_DB_CLUSTER_NAME_ATTRIBUTE: + return clusterName; + } + + return super.getAttribute(attributeName); + } + + @Override + public void setAttribute(String attributeName, String attributeValue) { + switch (attributeName) { + case HIVE_DB_NAME_ATTRIBUTE: + databaseName = attributeValue; + + isCustomerAttributeUpdated = true; + break; + + case HIVE_TABLE_NAME_ATTRIBUTE: + tableName = attributeValue; + + isCustomerAttributeUpdated = true; + break; + + case HIVE_COLUMN_NAME_ATTRIBUTE: + columnName = attributeValue; + + isCustomerAttributeUpdated = true; + break; + + case HIVE_DB_CLUSTER_NAME_ATTRIBUTE: + clusterName = attributeValue; + + isCustomerAttributeUpdated = true; + break; + + default: + super.setAttribute(attributeName, attributeValue); + break; + } + } + + @Override + public void transformComplete() { + if (isCustomerAttributeUpdated) { + entity.setAttribute(NAME_ATTRIBUTE, columnName); + entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, toQualifiedName()); + } + } + + private String toQualifiedName() { + return String.format("%s.%s.%s@%s", databaseName, tableName, columnName, clusterName); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/dde93556/intg/src/main/java/org/apache/atlas/entitytransform/HiveDatabaseEntityHandler.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/HiveDatabaseEntityHandler.java b/intg/src/main/java/org/apache/atlas/entitytransform/HiveDatabaseEntityHandler.java new file mode 100644 index 0000000..8a2e813 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/entitytransform/HiveDatabaseEntityHandler.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.entitytransform; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.commons.lang.StringUtils; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.atlas.entitytransform.TransformationConstants.*; + +public class HiveDatabaseEntityHandler extends BaseEntityHandler { + private static final List<String> CUSTOM_TRANSFORM_ATTRIBUTES = Arrays.asList(HIVE_DB_NAME_ATTRIBUTE, HIVE_DB_CLUSTER_NAME_ATTRIBUTE); + + public HiveDatabaseEntityHandler(List<AtlasEntityTransformer> transformers) { + super(transformers, CUSTOM_TRANSFORM_ATTRIBUTES); + } + + @Override + public AtlasTransformableEntity getTransformableEntity(AtlasEntity entity) { + return isHiveDatabaseEntity(entity) ? new HiveDatabaseEntity(entity) : null; + } + + + private boolean isHiveDatabaseEntity(AtlasEntity entity) { + return StringUtils.equals(entity.getTypeName(), HIVE_DATABASE); + } + + private static class HiveDatabaseEntity extends AtlasTransformableEntity { + private String databaseName; + private String clusterName; + private boolean isCustomerAttributeUpdated = false; + + public HiveDatabaseEntity(AtlasEntity entity) { + super(entity); + + this.databaseName = (String) entity.getAttribute(NAME_ATTRIBUTE); + + String qualifiedName = (String) entity.getAttribute(QUALIFIED_NAME_ATTRIBUTE); + + if (qualifiedName != null) { + int clusterSeparatorIdx = qualifiedName.lastIndexOf(CLUSTER_DELIMITER); + + this.clusterName = clusterSeparatorIdx != -1 ? qualifiedName.substring(clusterSeparatorIdx + 1) : ""; + } else { + this.clusterName = ""; + } + } + + @Override + public Object getAttribute(String attributeName) { + switch (attributeName) { + case HIVE_DB_NAME_ATTRIBUTE: + return databaseName; + + case HIVE_DB_CLUSTER_NAME_ATTRIBUTE: + return clusterName; + } + + return super.getAttribute(attributeName); + } + + @Override + public void setAttribute(String attributeName, String attributeValue) { + switch (attributeName) { + case HIVE_DB_NAME_ATTRIBUTE: + databaseName = attributeValue; + + isCustomerAttributeUpdated = true; + break; + + case HIVE_DB_CLUSTER_NAME_ATTRIBUTE: + clusterName = attributeValue; + + isCustomerAttributeUpdated = true; + break; + + default: + super.setAttribute(attributeName, attributeValue); + break; + } + } + + @Override + public void transformComplete() { + if (isCustomerAttributeUpdated) { + entity.setAttribute(NAME_ATTRIBUTE, databaseName); + entity.setAttribute(CLUSTER_NAME_ATTRIBUTE, clusterName); + entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, toQualifiedName()); + } + } + + private String toQualifiedName() { + return String.format("%s@%s", databaseName, clusterName); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/dde93556/intg/src/main/java/org/apache/atlas/entitytransform/HiveStorageDescriptorEntityHandler.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/HiveStorageDescriptorEntityHandler.java b/intg/src/main/java/org/apache/atlas/entitytransform/HiveStorageDescriptorEntityHandler.java new file mode 100644 index 0000000..6a7b17b --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/entitytransform/HiveStorageDescriptorEntityHandler.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.entitytransform; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.commons.lang.StringUtils; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.atlas.entitytransform.TransformationConstants.*; + +public class HiveStorageDescriptorEntityHandler extends BaseEntityHandler { + private static final List<String> CUSTOM_TRANSFORM_ATTRIBUTES = Arrays.asList(HIVE_DB_NAME_ATTRIBUTE, HIVE_TABLE_NAME_ATTRIBUTE, HIVE_DB_CLUSTER_NAME_ATTRIBUTE); + + + public HiveStorageDescriptorEntityHandler(List<AtlasEntityTransformer> transformers) { + super(transformers, CUSTOM_TRANSFORM_ATTRIBUTES); + } + + @Override + public AtlasTransformableEntity getTransformableEntity(AtlasEntity entity) { + return isHiveStorageDescEntity(entity) ? new HiveStorageDescriptorEntity(entity) : null; + } + + private boolean isHiveStorageDescEntity(AtlasEntity entity) { + return StringUtils.equals(entity.getTypeName(), HIVE_STORAGE_DESCRIPTOR); + } + + public static class HiveStorageDescriptorEntity extends AtlasTransformableEntity { + private String databaseName; + private String tableName; + private String clusterName; + private String location; + private boolean isCustomerAttributeUpdated = false; + + + public HiveStorageDescriptorEntity(AtlasEntity entity) { + super(entity); + + this.location = (String) entity.getAttribute(LOCATION_ATTRIBUTE); + + String qualifiedName = (String) entity.getAttribute(QUALIFIED_NAME_ATTRIBUTE); + + if (qualifiedName != null) { + int databaseSeparatorIdx = qualifiedName.indexOf(DATABASE_DELIMITER); + int clusterSeparatorIdx = qualifiedName.lastIndexOf(CLUSTER_DELIMITER); + String clusterNameWithSuffix = clusterSeparatorIdx != -1 ? qualifiedName.substring(clusterSeparatorIdx + 1) : ""; + + this.databaseName = (databaseSeparatorIdx != -1) ? qualifiedName.substring(0, databaseSeparatorIdx) : ""; + this.tableName = (databaseSeparatorIdx != -1 && clusterSeparatorIdx != -1) ? qualifiedName.substring(databaseSeparatorIdx + 1, clusterSeparatorIdx) : ""; + + if (StringUtils.isNotEmpty(clusterNameWithSuffix)) { + int idx = clusterNameWithSuffix.lastIndexOf(HIVE_STORAGEDESC_SUFFIX); + + this.clusterName = (idx != -1) ? clusterNameWithSuffix.substring(0, idx) : ""; + } else { + this.clusterName = ""; + } + } else { + this.databaseName = ""; + this.tableName = ""; + this.clusterName = ""; + } + } + + @Override + public Object getAttribute(String attributeName) { + switch (attributeName) { + case HIVE_DB_NAME_ATTRIBUTE: + return databaseName; + + case HIVE_TABLE_NAME_ATTRIBUTE: + return tableName; + + case HIVE_DB_CLUSTER_NAME_ATTRIBUTE: + return clusterName; + } + + return super.getAttribute(attributeName); + } + + @Override + public void setAttribute(String attributeName, String attributeValue) { + switch (attributeName) { + case HIVE_DB_NAME_ATTRIBUTE: + databaseName = attributeValue; + + isCustomerAttributeUpdated = true; + break; + + case HIVE_TABLE_NAME_ATTRIBUTE: + tableName = attributeValue; + + isCustomerAttributeUpdated = true; + break; + + case HIVE_DB_CLUSTER_NAME_ATTRIBUTE: + clusterName = attributeValue; + + isCustomerAttributeUpdated = true; + break; + + default: + super.setAttribute(attributeName, attributeValue); + break; + } + } + + @Override + public void transformComplete() { + if (isCustomerAttributeUpdated) { + entity.setAttribute(LOCATION_ATTRIBUTE, toLocation()); + entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, toQualifiedName()); + } + } + + private String toLocation() { + int lastPathIndex = location != null ? location.lastIndexOf(PATH_DELIMITER) : -1; + + return lastPathIndex != -1 ? location.substring(0, lastPathIndex) + PATH_DELIMITER + tableName : location; + } + + private String toQualifiedName() { + return String.format("%s.%s@%s_storage", databaseName, tableName, clusterName); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/dde93556/intg/src/main/java/org/apache/atlas/entitytransform/HiveTableEntityHandler.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/HiveTableEntityHandler.java b/intg/src/main/java/org/apache/atlas/entitytransform/HiveTableEntityHandler.java new file mode 100644 index 0000000..b008e6c --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/entitytransform/HiveTableEntityHandler.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.entitytransform; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.commons.lang.StringUtils; + +import java.util.Arrays; +import java.util.List; + +import static org.apache.atlas.entitytransform.TransformationConstants.*; + +public class HiveTableEntityHandler extends BaseEntityHandler { + private static final List<String> CUSTOM_TRANSFORM_ATTRIBUTES = Arrays.asList(HIVE_DB_NAME_ATTRIBUTE, HIVE_TABLE_NAME_ATTRIBUTE, HIVE_DB_CLUSTER_NAME_ATTRIBUTE); + + + public HiveTableEntityHandler(List<AtlasEntityTransformer> transformers) { + super(transformers, CUSTOM_TRANSFORM_ATTRIBUTES); + } + + @Override + public AtlasTransformableEntity getTransformableEntity(AtlasEntity entity) { + return isHiveTableEntity(entity) ? new HiveTableEntity(entity) : null; + } + + private boolean isHiveTableEntity(AtlasEntity entity) { + return StringUtils.equals(entity.getTypeName(), HIVE_TABLE); + } + + private static class HiveTableEntity extends AtlasTransformableEntity { + private String databaseName; + private String tableName; + private String clusterName; + private boolean isCustomerAttributeUpdated = false; + + + public HiveTableEntity(AtlasEntity entity) { + super(entity); + + this.tableName = (String) entity.getAttribute(NAME_ATTRIBUTE); + + String qualifiedName = (String) entity.getAttribute(QUALIFIED_NAME_ATTRIBUTE); + + if (qualifiedName != null) { + int databaseSeparatorIdx = qualifiedName.indexOf(DATABASE_DELIMITER); + int clusterSeparatorIdx = qualifiedName.lastIndexOf(CLUSTER_DELIMITER); + + this.databaseName = databaseSeparatorIdx != -1 ? qualifiedName.substring(0, databaseSeparatorIdx) : ""; + this.clusterName = clusterSeparatorIdx != -1 ? qualifiedName.substring(clusterSeparatorIdx + 1) : ""; + } else { + this.databaseName = ""; + this.clusterName = ""; + } + } + + @Override + public Object getAttribute(String attributeName) { + switch (attributeName) { + case HIVE_TABLE_NAME_ATTRIBUTE: + return tableName; + + case HIVE_DB_NAME_ATTRIBUTE: + return databaseName; + + case HIVE_DB_CLUSTER_NAME_ATTRIBUTE: + return clusterName; + } + + return super.getAttribute(attributeName); + } + + @Override + public void setAttribute(String attributeName, String attributeValue) { + switch (attributeName) { + case HIVE_TABLE_NAME_ATTRIBUTE: + tableName = attributeValue; + + isCustomerAttributeUpdated = true; + break; + + case HIVE_DB_NAME_ATTRIBUTE: + databaseName = attributeValue; + + isCustomerAttributeUpdated = true; + break; + + case HIVE_DB_CLUSTER_NAME_ATTRIBUTE: + clusterName = attributeValue; + + isCustomerAttributeUpdated = true; + break; + + default: + super.setAttribute(attributeName, attributeValue); + break; + } + } + + @Override + public void transformComplete() { + if (isCustomerAttributeUpdated) { + entity.setAttribute(NAME_ATTRIBUTE, tableName); + entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, toQualifiedName()); + } + } + + + private String toQualifiedName() { + return String.format("%s.%s@%s", databaseName, tableName, clusterName); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/dde93556/intg/src/main/java/org/apache/atlas/entitytransform/TransformationConstants.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/entitytransform/TransformationConstants.java b/intg/src/main/java/org/apache/atlas/entitytransform/TransformationConstants.java new file mode 100644 index 0000000..51c3ace --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/entitytransform/TransformationConstants.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.entitytransform; + +public final class TransformationConstants { + public static final String HDFS_PATH = "hdfs_path"; + public static final String HIVE_DATABASE = "hive_db"; + public static final String HIVE_TABLE = "hive_table"; + public static final String HIVE_COLUMN = "hive_column"; + public static final String HIVE_STORAGE_DESCRIPTOR = "hive_storagedesc"; + + public static final String NAME_ATTRIBUTE = "name"; + public static final String QUALIFIED_NAME_ATTRIBUTE = "qualifiedName"; + public static final String CLUSTER_NAME_ATTRIBUTE = "clusterName"; + public static final String LOCATION_ATTRIBUTE = "location"; + public static final String PATH_ATTRIBUTE = "path"; + + public static final String HIVE_DB_NAME_ATTRIBUTE = "hive_db.name"; + public static final String HIVE_DB_CLUSTER_NAME_ATTRIBUTE = "hive_db.clusterName"; + public static final String HIVE_TABLE_NAME_ATTRIBUTE = "hive_table.name"; + public static final String HIVE_COLUMN_NAME_ATTRIBUTE = "hive_column.name"; + public static final String HDFS_PATH_NAME_ATTRIBUTE = "hdfs_path.name"; + public static final String HDFS_PATH_PATH_ATTRIBUTE = "hdfs_path.path"; + public static final String HDFS_CLUSTER_NAME_ATTRIBUTE = "hdfs_path.clusterName"; + + public static final char TYPE_NAME_ATTRIBUTE_NAME_SEP = '.'; + public static final char CLUSTER_DELIMITER = '@'; + public static final char DATABASE_DELIMITER = '.'; + public static final char PATH_DELIMITER = '/'; + public static final String HIVE_STORAGEDESC_SUFFIX = "_storage"; + + +} http://git-wip-us.apache.org/repos/asf/atlas/blob/dde93556/intg/src/main/java/org/apache/atlas/model/impexp/AttributeTransform.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/impexp/AttributeTransform.java b/intg/src/main/java/org/apache/atlas/model/impexp/AttributeTransform.java new file mode 100644 index 0000000..621f5f3 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/impexp/AttributeTransform.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model.impexp; + +import org.apache.commons.lang.StringUtils; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + + +@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE) +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown=true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.PROPERTY) +public class AttributeTransform implements Serializable { + private Map<String, String> conditions; + private Map<String, String> action; + + public AttributeTransform() { } + + public AttributeTransform(Map<String, String> conditions, Map<String, String> action) { + this.conditions = conditions; + this.action = action; + } + + public Map<String, String> getConditions() { + return conditions; + } + + public void setConditions(Map<String, String> conditions) { + this.conditions = conditions; + } + + public Map<String, String> getAction() { + return action; + } + + public void setAction(Map<String, String> action) { + this.action = action; + } + + public void addCondition(String attributeName, String conditionValue) { + if (conditions == null) { + conditions = new HashMap<>(); + } + + if (StringUtils.isNotEmpty(attributeName) && StringUtils.isNotEmpty(conditionValue)) { + conditions.put(attributeName, conditionValue); + } + } + + public void addAction(String attributeName, String actionValue) { + if (action == null) { + action = new HashMap<>(); + } + + if (StringUtils.isNotEmpty(attributeName) && StringUtils.isNotEmpty(actionValue)) { + action.put(attributeName, actionValue); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/dde93556/intg/src/test/java/org/apache/atlas/entitytransform/TransformationHandlerTest.java ---------------------------------------------------------------------- diff --git a/intg/src/test/java/org/apache/atlas/entitytransform/TransformationHandlerTest.java b/intg/src/test/java/org/apache/atlas/entitytransform/TransformationHandlerTest.java new file mode 100644 index 0000000..69fba1e --- /dev/null +++ b/intg/src/test/java/org/apache/atlas/entitytransform/TransformationHandlerTest.java @@ -0,0 +1,370 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.entitytransform; + +import org.apache.atlas.model.impexp.AttributeTransform; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.commons.lang.StringUtils; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.atlas.entitytransform.TransformationConstants.HDFS_PATH; + +public class TransformationHandlerTest { + @Test + public void testHdfsClusterRenameHandler() { + // Rename clusterName from cl1 to cl2 + AttributeTransform p1 = new AttributeTransform(Collections.singletonMap("hdfs_path.clusterName", "EQUALS: cl1"), + Collections.singletonMap("hdfs_path.clusterName", "SET: cl2")); + + List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p1)); + + for (AtlasEntity hdfsPath : getHdfsPathEntities()) { + String qualifiedName = (String) hdfsPath.getAttribute("qualifiedName"); + boolean endsWithCl1 = qualifiedName.endsWith("@cl1"); + + applyTransforms(hdfsPath, handlers); + + String transformedValue = (String) hdfsPath.getAttribute("qualifiedName"); + + if (endsWithCl1) { + Assert.assertTrue(transformedValue.endsWith("@cl2"), transformedValue + ": expected to end with @cl2"); + } else { + Assert.assertEquals(qualifiedName, transformedValue, "not expected to change"); + } + } + } + + @Test + public void testHdfsClusterNameToggleCaseHandler() { + // Change clusterName to Upper case + AttributeTransform p1 = new AttributeTransform(Collections.singletonMap("hdfs_path.clusterName", "EQUALS: cl1"), + Collections.singletonMap("hdfs_path.clusterName", "TO_UPPER:")); + + List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p1)); + + List<AtlasEntity> hdfsPaths = getHdfsPathEntities(); + + for (AtlasEntity hdfsPath : hdfsPaths) { + String qualifiedName = (String) hdfsPath.getAttribute("qualifiedName"); + boolean endsWithCl1 = qualifiedName.endsWith("@cl1"); + + applyTransforms(hdfsPath, handlers); + + String transformedValue = (String) hdfsPath.getAttribute("qualifiedName"); + + if (endsWithCl1) { + Assert.assertTrue(transformedValue.endsWith("@CL1"), transformedValue + ": expected to end with @CL1"); + } else { + Assert.assertEquals(qualifiedName, transformedValue, "not expected to change"); + } + } + + // Change clusterName back to lower case + AttributeTransform p2 = new AttributeTransform(Collections.singletonMap("hdfs_path.clusterName", "EQUALS: CL1"), + Collections.singletonMap("hdfs_path.clusterName", "TO_LOWER:")); + + handlers = initializeHandlers(Collections.singletonList(p2)); + + for (AtlasEntity hdfsPath : hdfsPaths) { + String qualifiedName = (String) hdfsPath.getAttribute("qualifiedName"); + boolean endsWithCL1 = qualifiedName.endsWith("@CL1"); + + applyTransforms(hdfsPath, handlers); + + String transformedValue = (String) hdfsPath.getAttribute("qualifiedName"); + + if (endsWithCL1) { + Assert.assertTrue(transformedValue.endsWith("@cl1"), transformedValue + ": expected to end with @cl1"); + } else { + Assert.assertEquals(qualifiedName, transformedValue, "not expected to change"); + } + } + } + + @Test + public void testHdfsPathNameReplacePrefixHandler() { + // Prefix replace hdfs_path name from /aa/bb/ to /xx/yy/ + AttributeTransform p1 = new AttributeTransform(Collections.singletonMap("hdfs_path.name", "STARTS_WITH: /aa/bb/"), + Collections.singletonMap("hdfs_path.name", "REPLACE_PREFIX: = :/aa/bb/=/xx/yy/")); + + List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p1)); + + for (AtlasEntity hdfsPath : getHdfsPathEntities()) { + String name = (String) hdfsPath.getAttribute("name"); + boolean startsWith_aa_bb_ = name.startsWith("/aa/bb/"); + + applyTransforms(hdfsPath, handlers); + + String transformedValue = (String) hdfsPath.getAttribute("name"); + + if (startsWith_aa_bb_) { + Assert.assertTrue(transformedValue.startsWith("/xx/yy/"), transformedValue + ": expected to start with /xx/yy/"); + } else { + Assert.assertEquals(name, transformedValue, "not expected to change"); + } + } + } + + @Test + public void testHiveDatabaseClusterRenameHandler() { + // replace clusterName: from cl1 to cl1_backup + AttributeTransform p1 = new AttributeTransform(Collections.singletonMap("hive_db.clusterName", "EQUALS: cl1"), + Collections.singletonMap("hive_db.clusterName", "SET: cl1_backup")); + + List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p1)); + + for (AtlasEntity entity : getAllEntities()) { + String qualifiedName = (String) entity.getAttribute("qualifiedName"); + boolean isHdfsPath = StringUtils.equals(entity.getTypeName(), HDFS_PATH); + boolean endsWithCl1 = qualifiedName.endsWith("@cl1"); + boolean containsCl1 = qualifiedName.contains("@cl1"); // for stroage_desc + + applyTransforms(entity, handlers); + + String transformedValue = (String) entity.getAttribute("qualifiedName"); + + if (!isHdfsPath && endsWithCl1) { + Assert.assertTrue(transformedValue.endsWith("@cl1_backup"), transformedValue + ": expected to end with @cl1_backup"); + } else if (!isHdfsPath && containsCl1) { + Assert.assertTrue(transformedValue.contains("@cl1_backup"), transformedValue + ": expected to contains @cl1_backup"); + } else { + Assert.assertEquals(qualifiedName, transformedValue, "not expected to change"); + } + } + } + + @Test + public void testHiveDatabaseNameRenameHandler() { + // replace dbName: from hr to hr_backup + AttributeTransform p = new AttributeTransform(Collections.singletonMap("hive_db.name", "EQUALS: hr"), + Collections.singletonMap("hive_db.name", "SET: hr_backup")); + + List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p)); + + for (AtlasEntity entity : getAllEntities()) { + String qualifiedName = (String) entity.getAttribute("qualifiedName"); + boolean startsWithHrDot = qualifiedName.startsWith("hr."); // for tables, columns + boolean startsWithHrAt = qualifiedName.startsWith("hr@"); // for databases + + applyTransforms(entity, handlers); + + if (startsWithHrDot) { + Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr_backup.")); + } else if (startsWithHrAt) { + Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr_backup@")); + } else { + Assert.assertEquals(qualifiedName, (String) entity.getAttribute("qualifiedName"), "not expected to change"); + } + } + } + + @Test + public void testHiveTableNameRenameHandler() { + // replace tableName: from hr.employees to hr.employees_backup + AttributeTransform p = new AttributeTransform(); + p.addCondition("hive_db.name", "EQUALS: hr"); + p.addCondition("hive_table.name", "EQUALS: employees"); + p.addAction("hive_table.name", "SET: employees_backup"); + + List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p)); + + for (AtlasEntity entity : getAllEntities()) { + String qualifiedName = (String) entity.getAttribute("qualifiedName"); + boolean startsWithHrEmployeesDot = qualifiedName.startsWith("hr.employees."); // for columns + boolean startsWithHrEmployeesAt = qualifiedName.startsWith("hr.employees@"); // for tables + + applyTransforms(entity, handlers); + + if (startsWithHrEmployeesDot) { + Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr.employees_backup.")); + } else if (startsWithHrEmployeesAt) { + Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr.employees_backup@")); + } else { + Assert.assertEquals(qualifiedName, (String) entity.getAttribute("qualifiedName"), "not expected to change"); + } + } + } + + @Test + public void testHiveColumnNameRenameHandler() { + // replace columnName: from hr.employees.age to hr.employees.age_backup + AttributeTransform p = new AttributeTransform(); + p.addCondition("hive_db.name", "EQUALS: hr"); + p.addCondition("hive_table.name", "EQUALS: employees"); + p.addCondition("hive_column.name", "EQUALS: age"); + p.addAction("hive_column.name", "SET: age_backup"); + + List<BaseEntityHandler> handlers = initializeHandlers(Collections.singletonList(p)); + + for (AtlasEntity entity : getAllEntities()) { + String qualifiedName = (String) entity.getAttribute("qualifiedName"); + boolean startsWithHrEmployeesAgeAt = qualifiedName.startsWith("hr.employees.age@"); + + applyTransforms(entity, handlers); + + if (startsWithHrEmployeesAgeAt) { + Assert.assertTrue(((String) entity.getAttribute("qualifiedName")).startsWith("hr.employees.age_backup@")); + } else { + Assert.assertEquals(qualifiedName, (String) entity.getAttribute("qualifiedName"), "not expected to change"); + } + } + } + + private List<BaseEntityHandler> initializeHandlers(List<AttributeTransform> params) { + return BaseEntityHandler.createEntityHandlers(params); + } + + private void applyTransforms(AtlasEntity entity, List<BaseEntityHandler> handlers) { + for (BaseEntityHandler handler : handlers) { + handler.transform(entity); + } + } + + final String[] clusterNames = new String[] { "cl1", "prod" }; + final String[] databaseNames = new String[] { "hr", "sales", "engg" }; + final String[] tableNames = new String[] { "employees", "products", "invoice" }; + final String[] columnNames = new String[] { "name", "age", "dob" }; + + private List<AtlasEntity> getHdfsPathEntities() { + List<AtlasEntity> ret = new ArrayList<>(); + + for (String clusterName : clusterNames) { + ret.add(getHdfsPathEntity1(clusterName)); + ret.add(getHdfsPathEntity2(clusterName)); + } + + return ret; + } + + private List<AtlasEntity> getAllEntities() { + List<AtlasEntity> ret = new ArrayList<>(); + + for (String clusterName : clusterNames) { + ret.add(getHdfsPathEntity1(clusterName)); + ret.add(getHdfsPathEntity2(clusterName)); + + for (String databaseName : databaseNames) { + ret.add(getHiveDbEntity(clusterName, databaseName)); + + for (String tableName : tableNames) { + ret.add(getHiveTableEntity(clusterName, databaseName, tableName)); + ret.add(getHiveStorageDescriptorEntity(clusterName, databaseName, tableName)); + + for (String columnName : columnNames) { + ret.add(getHiveColumnEntity(clusterName, databaseName, tableName, columnName)); + } + } + } + } + + return ret; + } + + private AtlasEntity getHdfsPathEntity1(String clusterName) { + AtlasEntity entity = new AtlasEntity(HDFS_PATH); + + entity.setAttribute("name", "/aa/bb/employee"); + entity.setAttribute("path", "hdfs://localhost.localdomain:8020/aa/bb/employee"); + entity.setAttribute("qualifiedName", "hdfs://localhost.localdomain:8020/aa/bb/employee@" + clusterName); + entity.setAttribute("clusterName", clusterName); + entity.setAttribute("isSymlink", false); + entity.setAttribute("modifiedTime", 0); + entity.setAttribute("isFile", false); + entity.setAttribute("numberOfReplicas", 0); + entity.setAttribute("createTime", 0); + entity.setAttribute("fileSize", 0); + + return entity; + } + + private AtlasEntity getHdfsPathEntity2(String clusterName) { + AtlasEntity entity = new AtlasEntity(HDFS_PATH); + + entity.setAttribute("name", "/cc/dd/employee"); + entity.setAttribute("path", "hdfs://localhost.localdomain:8020/cc/dd/employee"); + entity.setAttribute("qualifiedName", "hdfs://localhost.localdomain:8020/cc/dd/employee@" + clusterName); + entity.setAttribute("clusterName", clusterName); + entity.setAttribute("isSymlink", false); + entity.setAttribute("modifiedTime", 0); + entity.setAttribute("isFile", false); + entity.setAttribute("numberOfReplicas", 0); + entity.setAttribute("createTime", 0); + entity.setAttribute("fileSize", 0); + + return entity; + } + + private AtlasEntity getHiveDbEntity(String clusterName, String dbName) { + AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_DATABASE); + + entity.setAttribute("name", dbName); + entity.setAttribute("qualifiedName", dbName + "@" + clusterName); + entity.setAttribute("location", "hdfs://localhost.localdomain:8020/warehouse/tablespace/managed/hive/" + dbName + ".db"); + entity.setAttribute("clusterName", clusterName); + entity.setAttribute("owner", "hive"); + entity.setAttribute("ownerType", "USER"); + + return entity; + } + + private AtlasEntity getHiveTableEntity(String clusterName, String dbName, String tableName) { + AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_TABLE); + + entity.setAttribute("name", tableName); + entity.setAttribute("qualifiedName", dbName + "." + tableName + "@" + clusterName); + entity.setAttribute("owner", "hive"); + entity.setAttribute("temporary", false); + entity.setAttribute("lastAccessTime", "1535656355000"); + entity.setAttribute("tableType", "EXTERNAL_TABLE"); + entity.setAttribute("createTime", "1535656355000"); + entity.setAttribute("retention", 0); + + return entity; + } + + private AtlasEntity getHiveStorageDescriptorEntity(String clusterName, String dbName, String tableName) { + AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_STORAGE_DESCRIPTOR); + + entity.setAttribute("qualifiedName", dbName + "." + tableName + "@" + clusterName + "_storage"); + entity.setAttribute("storedAsSubDirectories", false); + entity.setAttribute("location", "hdfs://localhost.localdomain:8020/warehouse/tablespace/managed/hive/" + dbName + ".db" + "/" + tableName); + entity.setAttribute("compressed", false); + entity.setAttribute("inputFormat", "org.apache.hadoop.mapred.TextInputFormat"); + entity.setAttribute("outputFormat", "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"); + entity.setAttribute("numBuckets", -1); + + return entity; + } + + private AtlasEntity getHiveColumnEntity(String clusterName, String dbName, String tableName, String columnName) { + AtlasEntity entity = new AtlasEntity(TransformationConstants.HIVE_COLUMN); + + entity.setAttribute("owner", "hive"); + entity.setAttribute("qualifiedName", dbName + "." + tableName + "." + columnName +"@" + clusterName); + entity.setAttribute("name", columnName); + entity.setAttribute("position", 1); + entity.setAttribute("type", "string"); + + return entity; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/dde93556/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java index 7bc3536..4ce2328 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportTransformer.java @@ -23,7 +23,6 @@ import org.apache.atlas.model.instance.AtlasClassification; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.commons.lang.StringUtils; -import scala.Tuple3; import java.util.ArrayList; import java.util.List;