Repository: atlas
Updated Branches:
  refs/heads/master ac0764bee -> faeecf101


ATLAS-2316: when Hive table is created Atlas audit shows ENTITY_UPDATE instead 
of ENTITY_CREATE


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/faeecf10
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/faeecf10
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/faeecf10

Branch: refs/heads/master
Commit: faeecf10130bb2745dd41a74feb8709752d15951
Parents: ac0764b
Author: Madhan Neethiraj <mad...@apache.org>
Authored: Sun Dec 17 23:03:40 2017 -0800
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Wed Dec 20 22:41:36 2017 -0800

----------------------------------------------------------------------
 .../model/instance/EntityMutationResponse.java  |  8 ++
 .../org/apache/atlas/utils/AtlasEntityUtil.java | 86 ++++++++++++++++++++
 .../store/graph/v1/AtlasEntityStoreV1.java      | 35 ++++++--
 3 files changed, 124 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/faeecf10/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
----------------------------------------------------------------------
diff --git 
a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
 
b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
index 97c9084..05411d6 100644
--- 
a/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
+++ 
b/intg/src/main/java/org/apache/atlas/model/instance/EntityMutationResponse.java
@@ -191,7 +191,15 @@ public class EntityMutationResponse {
         return 
getFirstEntityByType(getEntitiesByOperation(EntityOperation.UPDATE), typeName);
     }
 
+    @JsonIgnore
     public void addEntity(EntityOperation op, AtlasEntityHeader header) {
+        // if an entity is already included in CREATE, ignore subsequent 
UPDATE, PARTIAL_UPDATE
+        if (op == EntityOperation.UPDATE || op == 
EntityOperation.PARTIAL_UPDATE) {
+            if (entityHeaderExists(getCreatedEntities(), header)) {
+                return;
+            }
+        }
+
         if (mutatedEntities == null) {
             mutatedEntities = new HashMap<>();
         }

http://git-wip-us.apache.org/repos/asf/atlas/blob/faeecf10/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
----------------------------------------------------------------------
diff --git a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java 
b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
new file mode 100644
index 0000000..e237e86
--- /dev/null
+++ b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.utils;
+
+
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.type.AtlasArrayType;
+import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasMapType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
+import org.apache.atlas.type.AtlasType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+
+
+public class AtlasEntityUtil {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasEntityUtil.class);
+
+    public static boolean hasAnyAttributeUpdate(AtlasEntityType entityType, 
AtlasEntity currEntity, AtlasEntity newEntity) {
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("==> hasAnyAttributeUpdate(guid={}, typeName={})", 
currEntity.getGuid(), currEntity.getTypeName());
+        }
+
+        boolean ret = false;
+
+        for (AtlasAttribute attribute : 
entityType.getAllAttributes().values()) {
+            String    attrName  = attribute.getName();
+            AtlasType attrType  = attribute.getAttributeType();
+            Object    currValue = 
attrType.getNormalizedValue(currEntity.getAttribute(attrName));
+            Object    newValue  = 
attrType.getNormalizedValue(newEntity.getAttribute(attrName));
+
+            if (!Objects.equals(currValue, newValue)) {
+                ret = true;
+
+                // for map/list types, treat 'null' same as empty
+                if ((currValue == null && newValue != null) || (currValue != 
null && newValue == null)) {
+                    if (attrType instanceof AtlasMapType) {
+                        if (MapUtils.isEmpty((Map) currValue) && 
MapUtils.isEmpty((Map) newValue)) {
+                            ret = false;
+                        }
+                    } else if (attrType instanceof AtlasArrayType) {
+                        if (CollectionUtils.isEmpty((Collection) currValue) && 
CollectionUtils.isEmpty((Collection) newValue)) {
+                            ret = false;
+                        }
+                    }
+                }
+
+                if (ret) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("hasAnyAttributeUpdate(guid={}, 
typeName={}): attribute '{}' is found updated - currentValue={}, newValue={}",
+                                  currEntity.getGuid(), 
currEntity.getTypeName(), attrName, currValue, newValue);
+                    }
+
+                    break;
+                }
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("<== hasAnyAttributeUpdate(guid={}, typeName={}): 
ret={}", currEntity.getGuid(), currEntity.getTypeName(), ret);
+        }
+
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/faeecf10/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
index b0f9845..a020e9f 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/AtlasEntityStoreV1.java
@@ -39,6 +39,7 @@ import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.type.AtlasTypeUtil;
+import org.apache.atlas.utils.AtlasEntityUtil;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -47,11 +48,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import javax.inject.Inject;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.DELETE;
 import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
@@ -173,6 +170,34 @@ public class AtlasEntityStoreV1 implements 
AtlasEntityStore {
         // Create/Update entities
         EntityMutationContext context = preCreateOrUpdate(entityStream, 
entityGraphMapper, isPartialUpdate);
 
+        // for existing entities, skip update if incoming entity doesn't have 
any change
+        if (CollectionUtils.isNotEmpty(context.getUpdatedEntities())) {
+            EntityGraphRetriever entityRetriever = new 
EntityGraphRetriever(typeRegistry);
+
+            List<AtlasEntity> entitiesToSkipUpdate = null;
+            for (AtlasEntity entity : context.getUpdatedEntities()) {
+                String          guid          = entity.getGuid();
+                AtlasVertex     vertex        = context.getVertex(guid);
+                AtlasEntity     entityInStore = 
entityRetriever.toAtlasEntity(vertex);
+                AtlasEntityType entityType    = 
typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+                if (!AtlasEntityUtil.hasAnyAttributeUpdate(entityType, entity, 
entityInStore)) {
+                    // if classifications are to be replaced as well, then 
skip updates only when no change in classifications as well
+                    if (!replaceClassifications || 
Objects.equals(entity.getClassifications(), 
entityInStore.getClassifications())) {
+                        if (entitiesToSkipUpdate == null) {
+                            entitiesToSkipUpdate = new ArrayList<>();
+                        }
+
+                        entitiesToSkipUpdate.add(entity);
+                    }
+                }
+            }
+
+            if (entitiesToSkipUpdate != null) {
+                context.getUpdatedEntities().removeAll(entitiesToSkipUpdate);
+            }
+        }
+
         EntityMutationResponse ret = 
entityGraphMapper.mapAttributesAndClassifications(context, isPartialUpdate, 
replaceClassifications);
 
         ret.setGuidAssignments(context.getGuidAssignments());

Reply via email to