Repository: atlas Updated Branches: refs/heads/master ac0764bee -> faeecf101
ATLAS-2316: when Hive table is created Atlas audit shows ENTITY_UPDATE instead of ENTITY_CREATE Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/faeecf10 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/faeecf10 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/faeecf10 Branch: refs/heads/master Commit: faeecf10130bb2745dd41a74feb8709752d15951 Parents: ac0764b Author: Madhan Neethiraj <mad...@apache.org> Authored: Sun Dec 17 23:03:40 2017 -0800 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Wed Dec 20 22:41:36 2017 -0800 ---------------------------------------------------------------------- .../model/instance/EntityMutationResponse.java | 8 ++ .../org/apache/atlas/utils/AtlasEntityUtil.java | 86 ++++++++++++++++++++ .../store/graph/v1/AtlasEntityStoreV1.java | 35 ++++++-- 3 files changed, 124 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/faeecf10/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java index 97c9084..05411d6 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java @@ -191,7 +191,15 @@ public class EntityMutationResponse { return getFirstEntityByType(getEntitiesByOperation(EntityOperation.UPDATE), typeName); } + @JsonIgnore public void addEntity(EntityOperation op, AtlasEntityHeader header) { + // if an entity is already included in CREATE, ignore subsequent UPDATE, PARTIAL_UPDATE + if (op == EntityOperation.UPDATE || op == EntityOperation.PARTIAL_UPDATE) { + if (entityHeaderExists(getCreatedEntities(), header)) { + return; + } + } + if (mutatedEntities == null) { mutatedEntities = new HashMap<>(); } http://git-wip-us.apache.org/repos/asf/atlas/blob/faeecf10/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java new file mode 100644 index 0000000..e237e86 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.utils; + + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.type.AtlasArrayType; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasMapType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasType; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; +import java.util.Objects; + + +public class AtlasEntityUtil { + private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityUtil.class); + + public static boolean hasAnyAttributeUpdate(AtlasEntityType entityType, AtlasEntity currEntity, AtlasEntity newEntity) { + if (LOG.isDebugEnabled()) { + LOG.debug("==> hasAnyAttributeUpdate(guid={}, typeName={})", currEntity.getGuid(), currEntity.getTypeName()); + } + + boolean ret = false; + + for (AtlasAttribute attribute : entityType.getAllAttributes().values()) { + String attrName = attribute.getName(); + AtlasType attrType = attribute.getAttributeType(); + Object currValue = attrType.getNormalizedValue(currEntity.getAttribute(attrName)); + Object newValue = attrType.getNormalizedValue(newEntity.getAttribute(attrName)); + + if (!Objects.equals(currValue, newValue)) { + ret = true; + + // for map/list types, treat 'null' same as empty + if ((currValue == null && newValue != null) || (currValue != null && newValue == null)) { + if (attrType instanceof AtlasMapType) { + if (MapUtils.isEmpty((Map) currValue) && MapUtils.isEmpty((Map) newValue)) { + ret = false; + } + } else if (attrType instanceof AtlasArrayType) { + if (CollectionUtils.isEmpty((Collection) currValue) && CollectionUtils.isEmpty((Collection) newValue)) { + ret = false; + } + } + } + + if (ret) { + if (LOG.isDebugEnabled()) { + LOG.debug("hasAnyAttributeUpdate(guid={}, typeName={}): attribute '{}' is found updated - currentValue={}, newValue={}", + currEntity.getGuid(), currEntity.getTypeName(), attrName, currValue, newValue); + } + + break; + } + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== hasAnyAttributeUpdate(guid={}, typeName={}): ret={}", currEntity.getGuid(), currEntity.getTypeName(), ret); + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/faeecf10/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java index b0f9845..a020e9f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java @@ -39,6 +39,7 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.utils.AtlasEntityUtil; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.StringUtils; @@ -47,11 +48,7 @@ import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.inject.Inject; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE; import static org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE; @@ -173,6 +170,34 @@ public class AtlasEntityStoreV1 implements AtlasEntityStore { // Create/Update entities EntityMutationContext context = preCreateOrUpdate(entityStream, entityGraphMapper, isPartialUpdate); + // for existing entities, skip update if incoming entity doesn't have any change + if (CollectionUtils.isNotEmpty(context.getUpdatedEntities())) { + EntityGraphRetriever entityRetriever = new EntityGraphRetriever(typeRegistry); + + List<AtlasEntity> entitiesToSkipUpdate = null; + for (AtlasEntity entity : context.getUpdatedEntities()) { + String guid = entity.getGuid(); + AtlasVertex vertex = context.getVertex(guid); + AtlasEntity entityInStore = entityRetriever.toAtlasEntity(vertex); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + + if (!AtlasEntityUtil.hasAnyAttributeUpdate(entityType, entity, entityInStore)) { + // if classifications are to be replaced as well, then skip updates only when no change in classifications as well + if (!replaceClassifications || Objects.equals(entity.getClassifications(), entityInStore.getClassifications())) { + if (entitiesToSkipUpdate == null) { + entitiesToSkipUpdate = new ArrayList<>(); + } + + entitiesToSkipUpdate.add(entity); + } + } + } + + if (entitiesToSkipUpdate != null) { + context.getUpdatedEntities().removeAll(entitiesToSkipUpdate); + } + } + EntityMutationResponse ret = entityGraphMapper.mapAttributesAndClassifications(context, isPartialUpdate, replaceClassifications); ret.setGuidAssignments(context.getGuidAssignments());