This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-1.0
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/branch-1.0 by this push:
     new 4898788  ATLAS-3054: improved batch processing in notificaiton handler 
to avoid processing of an entity multiple times - #2
4898788 is described below

commit 48987881826c5cfe4f518d44e55a488535a13ef8
Author: Madhan Neethiraj <mad...@apache.org>
AuthorDate: Mon Mar 11 16:17:03 2019 -0700

    ATLAS-3054: improved batch processing in notificaiton handler to avoid 
processing of an entity multiple times - #2
    
    (cherry picked from commit fc2a926cc7d0f0070c25f9afc68bf6a5a1bb6df2)
---
 addons/models/1000-Hadoop/1030-hive_model.json     |   2 +-
 .../org/apache/atlas/type/AtlasBuiltInTypes.java   |  13 +-
 .../org/apache/atlas/type/AtlasEntityType.java     |   4 +-
 .../apache/atlas/type/AtlasRelationshipType.java   |   7 +-
 .../org/apache/atlas/type/AtlasStructType.java     |  37 ++++-
 .../org/apache/atlas/utils/AtlasEntityUtil.java    |  33 ++++-
 .../atlas/discovery/EntityDiscoveryService.java    |  19 +--
 .../converters/AtlasStructFormatConverter.java     |   9 +-
 .../store/graph/v2/AtlasEntityStoreV2.java         |   2 +-
 .../store/graph/v2/EntityGraphMapper.java          |  79 ++++++++---
 .../notification/NotificationHookConsumer.java     | 153 ++++++++++++++++++++-
 .../preprocessor/HivePreprocessor.java             |   4 +-
 .../preprocessor/PreprocessorContext.java          |   4 +
 13 files changed, 305 insertions(+), 61 deletions(-)

diff --git a/addons/models/1000-Hadoop/1030-hive_model.json 
b/addons/models/1000-Hadoop/1030-hive_model.json
index 324d716..7207a41 100644
--- a/addons/models/1000-Hadoop/1030-hive_model.json
+++ b/addons/models/1000-Hadoop/1030-hive_model.json
@@ -508,7 +508,7 @@
             "serviceType": "hive",
             "typeVersion": "1.2",
             "relationshipCategory": "COMPOSITION",
-            "relationshipLabel": "__hive_table.partitionkeys",
+            "relationshipLabel": "__hive_table.partitionKeys",
             "endDef1": {
                 "type": "hive_table",
                 "name": "partitionKeys",
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java 
b/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java
index 6bedf6d..ce14b5b 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasBuiltInTypes.java
@@ -20,6 +20,7 @@ package org.apache.atlas.type;
 
 import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.AtlasRelatedObjectId;
 import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
@@ -672,19 +673,25 @@ public class AtlasBuiltInTypes {
 
         @Override
         public AtlasObjectId getNormalizedValue(Object obj) {
+            AtlasObjectId ret = null;
+
             if (obj != null) {
                 if (obj instanceof AtlasObjectId) {
-                    return (AtlasObjectId) obj;
+                    ret = (AtlasObjectId) obj;
                 } else if (obj instanceof Map) {
                     Map map = (Map) obj;
 
                     if (isValidMap(map)) {
-                        return new AtlasObjectId(map);
+                        if 
(map.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) {
+                            ret = new AtlasRelatedObjectId(map);
+                        } else {
+                            ret = new AtlasObjectId(map);
+                        }
                     }
                 }
             }
 
-            return null;
+            return ret;
         }
 
         private boolean isValidMap(Map map) {
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java 
b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
index 2557bb3..b5360c1 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasEntityType.java
@@ -799,7 +799,7 @@ public class AtlasEntityType extends AtlasStructType {
                 AtlasEntity entityObj = (AtlasEntity) obj;
 
                 for (String attributeName : relationshipAttributes.keySet()) {
-                    Object         value            = 
entityObj.getAttribute(attributeName);
+                    Object         value            = 
entityObj.getRelationshipAttribute(attributeName);
                     String         relationshipType = 
AtlasEntityUtil.getRelationshipType(value);
                     AtlasAttribute attribute        = 
getRelationshipAttribute(attributeName, relationshipType);
 
@@ -824,7 +824,7 @@ public class AtlasEntityType extends AtlasStructType {
                     }
                 }
             } else if (obj instanceof Map) {
-                Map attributes = AtlasTypeUtil.toStructAttributes((Map) obj);
+                Map attributes = AtlasTypeUtil.toRelationshipAttributes((Map) 
obj);
 
                 for (String attributeName : relationshipAttributes.keySet()) {
                     Object         value            = 
attributes.get(attributeName);
diff --git 
a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java 
b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
index 3ea8d80..183772b 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasRelationshipType.java
@@ -159,7 +159,9 @@ public class AtlasRelationshipType extends AtlasStructType {
             AtlasRelationshipEdgeDirection end2Direction = IN;
 
             if (endDef1.getIsLegacyAttribute() && 
endDef2.getIsLegacyAttribute()) {
-                end2Direction = OUT;
+                if (relationshipDef.getRelationshipLabel() == null) { // only 
if label hasn't been overridden
+                    end2Direction = OUT;
+                }
             } else if (!endDef1.getIsLegacyAttribute() && 
endDef2.getIsLegacyAttribute()) {
                 end1Direction = IN;
                 end2Direction = OUT;
@@ -345,11 +347,12 @@ public class AtlasRelationshipType extends 
AtlasStructType {
             }
 
             attribute = new AtlasAttribute(entityType, attributeDef,
-                                           typeRegistry.getType(attrTypeName), 
relationshipLabel);
+                                           typeRegistry.getType(attrTypeName), 
getTypeName(), relationshipLabel);
 
         } else {
             // attribute already exists (legacy attribute which is also a 
relationship attribute)
             // add relationshipLabel information to existing attribute
+            attribute.setRelationshipName(getTypeName());
             attribute.setRelationshipEdgeLabel(relationshipLabel);
         }
 
diff --git a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java 
b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
index 84c76d7..0be7e18 100644
--- a/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
+++ b/intg/src/main/java/org/apache/atlas/type/AtlasStructType.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
+import static org.apache.atlas.model.TypeCategory.*;
 import static 
org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_PARAM_ATTRIBUTE;
 import static 
org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_INVERSE_REF;
 import static 
org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef.CONSTRAINT_TYPE_OWNED_REF;
@@ -701,19 +702,23 @@ public class AtlasStructType extends AtlasType {
         private final String                   vertexPropertyName;
         private final String                   vertexUniquePropertyName;
         private final boolean                  isOwnedRef;
+        private final boolean                  isObjectRef;
         private final String                   inverseRefAttributeName;
         private AtlasAttribute                 inverseRefAttribute;
+        private String                         relationshipName;
         private String                         relationshipEdgeLabel;
         private AtlasRelationshipEdgeDirection relationshipEdgeDirection;
 
-        public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef 
attrDef, AtlasType attributeType, String relationshipLabel) {
+        public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef 
attrDef, AtlasType attributeType, String relationshipName, String 
relationshipLabel) {
             this.definedInType            = definedInType;
             this.attributeDef             = attrDef;
             this.attributeType            = 
attributeType.getTypeForAttribute();
             this.qualifiedName            = 
getQualifiedAttributeName(definedInType.getStructDef(), attributeDef.getName());
             this.vertexPropertyName       = 
encodePropertyKey(this.qualifiedName);
             this.vertexUniquePropertyName = attrDef.getIsUnique() ? 
encodePropertyKey(getQualifiedAttributeName(definedInType.getStructDef(), 
UNIQUE_ATTRIBUTE_SHADE_PROPERTY_PREFIX + attributeDef.getName())) : null;
+            this.relationshipName         = relationshipName;
             this.relationshipEdgeLabel    = 
getRelationshipEdgeLabel(relationshipLabel);
+
             boolean isOwnedRef            = false;
             String  inverseRefAttribute   = null;
 
@@ -736,10 +741,32 @@ public class AtlasStructType extends AtlasType {
             this.isOwnedRef                = isOwnedRef;
             this.inverseRefAttributeName   = inverseRefAttribute;
             this.relationshipEdgeDirection = 
AtlasRelationshipEdgeDirection.OUT;
+
+            switch (attributeType.getTypeCategory()) {
+                case OBJECT_ID_TYPE:
+                    isObjectRef = true;
+                break;
+
+                case MAP:
+                    AtlasMapType mapType = (AtlasMapType) attributeType;
+
+                    isObjectRef = mapType.getValueType().getTypeCategory() == 
OBJECT_ID_TYPE;
+                break;
+
+                case ARRAY:
+                    AtlasArrayType arrayType = (AtlasArrayType) attributeType;
+
+                    isObjectRef = arrayType.getElementType().getTypeCategory() 
== OBJECT_ID_TYPE;
+                break;
+
+                default:
+                    isObjectRef = false;
+                break;
+            }
         }
 
         public AtlasAttribute(AtlasStructType definedInType, AtlasAttributeDef 
attrDef, AtlasType attributeType) {
-            this(definedInType, attrDef, attributeType, null);
+            this(definedInType, attrDef, attributeType, null, null);
         }
 
         public AtlasStructType getDefinedInType() { return definedInType; }
@@ -766,12 +793,18 @@ public class AtlasStructType extends AtlasType {
 
         public boolean isOwnedRef() { return isOwnedRef; }
 
+        public boolean isObjectRef() { return isObjectRef; }
+
         public String getInverseRefAttributeName() { return 
inverseRefAttributeName; }
 
         public AtlasAttribute getInverseRefAttribute() { return 
inverseRefAttribute; }
 
         public void setInverseRefAttribute(AtlasAttribute inverseAttr) { 
inverseRefAttribute = inverseAttr; }
 
+        public String getRelationshipName() { return relationshipName; }
+
+        public void setRelationshipName(String relationshipName) { 
this.relationshipName = relationshipName; }
+
         public String getRelationshipEdgeLabel() { return 
relationshipEdgeLabel; }
 
         public void setRelationshipEdgeLabel(String relationshipEdgeLabel) { 
this.relationshipEdgeLabel = relationshipEdgeLabel; }
diff --git a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java 
b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
index 3002217..1e78e25 100644
--- a/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
+++ b/intg/src/main/java/org/apache/atlas/utils/AtlasEntityUtil.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -126,10 +127,38 @@ public class AtlasEntityUtil {
 
         if (val instanceof AtlasRelatedObjectId) {
             ret = ((AtlasRelatedObjectId) val).getRelationshipType();
+        } else if (val instanceof Collection) {
+            String elemRelationshipType = null;
+
+            for (Object elem : (Collection) val) {
+                elemRelationshipType = getRelationshipType(elem);
+
+                if (elemRelationshipType != null) {
+                    break;
+                }
+            }
+
+            ret = elemRelationshipType;
         } else if (val instanceof Map) {
-            Object relTypeName = ((Map) 
val).get(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE);
+            Map mapValue = (Map) val;
+
+            if 
(mapValue.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) {
+                Object relTypeName = ((Map) 
val).get(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE);
 
-            ret = relTypeName != null ? relTypeName.toString() : null;
+                ret = relTypeName != null ? relTypeName.toString() : null;
+            } else {
+                String entryRelationshipType = null;
+
+                for (Object entryVal : mapValue.values()) {
+                    entryRelationshipType = getRelationshipType(entryVal);
+
+                    if (entryRelationshipType != null) {
+                        break;
+                    }
+                }
+
+                ret = entryRelationshipType;
+            }
         } else {
             ret = null;
         }
diff --git 
a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
 
b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
index 9df360c..19f81d3 100644
--- 
a/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
+++ 
b/repository/src/main/java/org/apache/atlas/discovery/EntityDiscoveryService.java
@@ -537,7 +537,7 @@ public class EntityDiscoveryService implements 
AtlasDiscoveryService {
         AtlasAttribute attribute = entityType.getAttribute(relation);
 
         if (attribute != null) {
-            if (isRelationshipAttribute(attribute)) {
+            if (attribute.isObjectRef()) {
                 relation = attribute.getRelationshipEdgeLabel();
             } else {
                 throw new 
AtlasBaseException(AtlasErrorCode.INVALID_RELATIONSHIP_ATTRIBUTE, relation, 
attribute.getTypeName());
@@ -790,23 +790,6 @@ public class EntityDiscoveryService implements 
AtlasDiscoveryService {
         return "";
     }
 
-    private boolean isRelationshipAttribute(AtlasAttribute attribute) throws 
AtlasBaseException {
-        boolean   ret      = true;
-        AtlasType attrType = attribute.getAttributeType();
-
-        if (attrType.getTypeCategory() == ARRAY) {
-            attrType = ((AtlasArrayType) attrType).getElementType();
-        } else if (attrType.getTypeCategory() == MAP) {
-            attrType = ((AtlasMapType) attrType).getValueType();
-        }
-
-        if (attrType.getTypeCategory() != OBJECT_ID_TYPE) {
-            ret = false;
-        }
-
-        return ret;
-    }
-
     private Set<String> getEntityStates() {
         return new HashSet<>(Arrays.asList(ACTIVE.toString(), 
DELETED.toString()));
     }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
 
b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
index 51a6426..173fcee 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasStructFormatConverter.java
@@ -23,6 +23,7 @@ import org.apache.atlas.model.TypeCategory;
 import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasObjectId;
 import org.apache.atlas.model.instance.AtlasStruct;
+import org.apache.atlas.utils.AtlasEntityUtil;
 import org.apache.atlas.v1.model.instance.Struct;
 import org.apache.atlas.type.*;
 import org.apache.atlas.type.AtlasBuiltInTypes.AtlasObjectIdType;
@@ -134,11 +135,12 @@ public class AtlasStructFormatConverter extends 
AtlasAbstractFormatConverter {
 
             // Only process the requested/set attributes
             for (String attrName : attributes.keySet()) {
-                AtlasAttribute attr = structType.getAttribute(attrName);
+                Object         v2Value = attributes.get(attrName);
+                AtlasAttribute attr    = structType.getAttribute(attrName);
 
                 if (attr == null) {
                     if (isEntityType) {
-                        attr = ((AtlasEntityType) 
structType).getRelationshipAttribute(attrName, null);
+                        attr = ((AtlasEntityType) 
structType).getRelationshipAttribute(attrName, 
AtlasEntityUtil.getRelationshipType(v2Value));
                     }
 
                     if (attr == null) {
@@ -149,7 +151,6 @@ public class AtlasStructFormatConverter extends 
AtlasAbstractFormatConverter {
 
                 AtlasType            attrType      = attr.getAttributeType();
                 AtlasFormatConverter attrConverter = 
converterRegistry.getConverter(attrType.getTypeCategory());
-                Object               v2Value       = 
attributes.get(attr.getName());
 
                 if (v2Value != null && isEntityType && attr.isOwnedRef()) {
                     if (LOG.isDebugEnabled()) {
@@ -256,6 +257,7 @@ public class AtlasStructFormatConverter extends 
AtlasAbstractFormatConverter {
             // Only process the requested/set attributes
             for (Object attribKey : attributes.keySet()) {
                 String         attrName = attribKey.toString();
+                Object         v1Value  = attributes.get(attrName);
                 AtlasAttribute attr     = structType.getAttribute(attrName);
 
                 if (attr == null) {
@@ -271,7 +273,6 @@ public class AtlasStructFormatConverter extends 
AtlasAbstractFormatConverter {
 
                 AtlasType            attrType      = attr.getAttributeType();
                 AtlasFormatConverter attrConverter = 
converterRegistry.getConverter(attrType.getTypeCategory());
-                Object               v1Value       = attributes.get(attrName);
 
                 if (attrConverter.isValidValueV1(v1Value, attrType)) {
                     Object v2Value = attrConverter.fromV1ToV2(v1Value, 
attrType, context);
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
index a62f335..a5a6291 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java
@@ -323,7 +323,7 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore 
{
         AtlasAttribute    attr       = entityType.getAttribute(attrName);
 
         if (attr == null) {
-            attr = entityType.getRelationshipAttribute(attrName, null);
+            attr = entityType.getRelationshipAttribute(attrName, 
AtlasEntityUtil.getRelationshipType(attrValue));
 
             if (attr == null) {
                 throw new AtlasBaseException(AtlasErrorCode.UNKNOWN_ATTRIBUTE, 
attrName, entity.getTypeName());
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
index a6f1250..31b20ff 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java
@@ -79,8 +79,43 @@ import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.DE
 import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.PARTIAL_UPDATE;
 import static 
org.apache.atlas.model.instance.EntityMutations.EntityOperation.UPDATE;
 import static 
org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef.Cardinality.SET;
-import static org.apache.atlas.repository.Constants.*;
-import static org.apache.atlas.repository.graph.GraphHelper.*;
+import static org.apache.atlas.repository.Constants.ATTRIBUTE_KEY_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_GUID;
+import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_ENTITY_STATUS;
+import static org.apache.atlas.repository.Constants.CLASSIFICATION_LABEL;
+import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_VALIDITY_PERIODS_KEY;
+import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_PROPAGATE_KEY;
+import static 
org.apache.atlas.repository.Constants.CLASSIFICATION_VERTEX_REMOVE_PROPAGATIONS_KEY;
+import static org.apache.atlas.repository.Constants.CREATED_BY_KEY;
+import static org.apache.atlas.repository.Constants.ENTITY_TYPE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.HOME_ID_KEY;
+import static org.apache.atlas.repository.Constants.IS_PROXY_KEY;
+import static 
org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY;
+import static org.apache.atlas.repository.Constants.PROVENANCE_TYPE_KEY;
+import static 
org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TIMESTAMP_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.TRAIT_NAMES_PROPERTY_KEY;
+import static 
org.apache.atlas.repository.Constants.ATTRIBUTE_INDEX_PROPERTY_KEY;
+import static org.apache.atlas.repository.Constants.VERSION_PROPERTY_KEY;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getCollectionElementsUsingRelationship;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getClassificationEdge;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getDefaultRemovePropagations;
+import static 
org.apache.atlas.repository.graph.GraphHelper.getMapElementsProperty;
+import static org.apache.atlas.repository.graph.GraphHelper.getStatus;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitLabel;
+import static org.apache.atlas.repository.graph.GraphHelper.getTraitNames;
+import static org.apache.atlas.repository.graph.GraphHelper.getTypeName;
+import static org.apache.atlas.repository.graph.GraphHelper.getTypeNames;
+import static org.apache.atlas.repository.graph.GraphHelper.isActive;
+import static 
org.apache.atlas.repository.graph.GraphHelper.isPropagationEnabled;
+import static org.apache.atlas.repository.graph.GraphHelper.isRelationshipEdge;
+import static org.apache.atlas.repository.graph.GraphHelper.string;
+import static 
org.apache.atlas.repository.graph.GraphHelper.updateModificationMetadata;
 import static 
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.getIdFromVertex;
 import static 
org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.isReference;
 import static 
org.apache.atlas.type.AtlasStructType.AtlasAttribute.AtlasRelationshipEdgeDirection.IN;
@@ -827,27 +862,29 @@ public class EntityGraphMapper {
             throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, 
(ctx.getValue() == null ? null : ctx.getValue().toString()));
         }
 
-        String    attributeName = ctx.getAttribute().getName();
-        AtlasType type          = 
typeRegistry.getType(AtlasGraphUtilsV2.getTypeName(entityVertex));
-
-        AtlasRelationshipEdgeDirection edgeDirection = 
ctx.getAttribute().getRelationshipEdgeDirection();
+        AtlasType type = 
typeRegistry.getType(AtlasGraphUtilsV2.getTypeName(entityVertex));
 
         if (type instanceof AtlasEntityType) {
             AtlasEntityType entityType = (AtlasEntityType) type;
+            AtlasAttribute  attribute     = ctx.getAttribute();
+            String          attributeName = attribute.getName();
 
             // use relationship to create/update edges
             if (entityType.hasRelationshipAttribute(attributeName)) {
                 Map<String, Object> relationshipAttributes = 
getRelationshipAttributes(ctx.getValue());
 
                 if (ctx.getCurrentEdge() != null) {
-                    ret = updateRelationship(ctx.getCurrentEdge(), 
entityVertex, attributeVertex, edgeDirection, relationshipAttributes);
-
+                    ret = updateRelationship(ctx.getCurrentEdge(), 
entityVertex, attributeVertex, attribute.getRelationshipEdgeDirection(), 
relationshipAttributes);
                 } else {
-                    String      relationshipName = 
graphHelper.getRelationshipTypeName(entityVertex, entityType, attributeName);
+                    String      relationshipName = 
attribute.getRelationshipName();
                     AtlasVertex fromVertex;
                     AtlasVertex toVertex;
 
-                    if (edgeDirection == IN) {
+                    if (StringUtils.isEmpty(relationshipName)) {
+                        relationshipName = 
graphHelper.getRelationshipTypeName(entityVertex, entityType, attributeName);
+                    }
+
+                    if (attribute.getRelationshipEdgeDirection() == IN) {
                         fromVertex = attributeVertex;
                         toVertex   = entityVertex;
 
@@ -1106,21 +1143,29 @@ public class EntityGraphMapper {
     }
 
     private static AtlasObjectId getObjectId(Object val) throws 
AtlasBaseException {
+        AtlasObjectId ret = null;
+
         if (val != null) {
             if ( val instanceof  AtlasObjectId) {
-                return ((AtlasObjectId) val);
+                ret = ((AtlasObjectId) val);
             } else if (val instanceof Map) {
-                AtlasObjectId ret = new AtlasObjectId((Map)val);
+                Map map = (Map) val;
 
-                if (AtlasTypeUtil.isValid(ret)) {
-                    return ret;
+                if 
(map.containsKey(AtlasRelatedObjectId.KEY_RELATIONSHIP_TYPE)) {
+                    ret = new AtlasRelatedObjectId(map);
+                } else {
+                    ret = new AtlasObjectId((Map) val);
                 }
-            }
 
-            throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, 
val.toString());
+                if (!AtlasTypeUtil.isValid(ret)) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, val.toString());
+                }
+            } else {
+                throw new AtlasBaseException(AtlasErrorCode.INVALID_OBJECT_ID, 
val.toString());
+            }
         }
 
-        return null;
+        return ret;
     }
 
     private static String getGuid(Object val) throws AtlasBaseException {
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index f150f5e..8d8f718 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -35,6 +35,7 @@ import org.apache.atlas.model.instance.AtlasEntity;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.instance.EntityMutationResponse;
 import org.apache.atlas.model.notification.HookNotification;
 import 
org.apache.atlas.model.notification.HookNotification.EntityCreateRequestV2;
 import 
org.apache.atlas.model.notification.HookNotification.EntityDeleteRequestV2;
@@ -56,12 +57,14 @@ import 
org.apache.atlas.repository.store.graph.v2.AtlasEntityStream;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.service.Service;
 import org.apache.atlas.type.AtlasEntityType;
+import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
 import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.utils.AtlasPerfTracer;
 import org.apache.atlas.web.filters.AuditFilter;
 import org.apache.atlas.web.filters.AuditFilter.AuditLog;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kafka.common.TopicPartition;
@@ -75,6 +78,7 @@ import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -86,6 +90,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 
+import static org.apache.atlas.model.instance.AtlasObjectId.*;
+import static 
org.apache.atlas.notification.preprocessor.EntityPreprocessor.TYPE_HIVE_PROCESS;
+
 
 /**
  * Consumer of notifications from hooks e.g., hive hook etc.
@@ -174,7 +181,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         consumerRetryInterval = 
applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
         minWaitDuration       = 
applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, 
consumerRetryInterval); // 500 ms  by default
         maxWaitDuration       = 
applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 
60);  //  30 sec by default
-        commitBatchSize       = 
applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 0);
+        commitBatchSize       = 
applicationProperties.getInt(CONSUMER_COMMIT_BATCH_SIZE, 50);
 
         skipHiveColumnLineageHive20633                = 
applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
false);
         skipHiveColumnLineageHive20633InputsThreshold = 
applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD,
 15); // skip if avg # of inputs is > 15
@@ -216,8 +223,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             hiveTablesCache = Collections.emptyMap();
         }
 
-        hiveTypesRemoveOwnedRefAttrs  = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS,
 false);
-        rdbmsTypesRemoveOwnedRefAttrs = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS,
 false);
+        hiveTypesRemoveOwnedRefAttrs  = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS,
 true);
+        rdbmsTypesRemoveOwnedRefAttrs = 
applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS,
 true);
         preprocessEnabled             = !hiveTablesToIgnore.isEmpty() || 
!hiveTablesToPrune.isEmpty() || skipHiveColumnLineageHive20633 || 
hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs;
 
         LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, 
skipHiveColumnLineageHive20633);
@@ -693,6 +700,8 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             if (commitBatchSize <= 0 || entitiesList.size() <= 
commitBatchSize) {
                 atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
             } else {
+                Map<String, String> guidAssignments = new HashMap<>();
+
                 for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx 
+= commitBatchSize) {
                     int toIndex = fromIdx + commitBatchSize;
 
@@ -700,10 +709,16 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                         toIndex = entitiesList.size();
                     }
 
-                    AtlasEntitiesWithExtInfo batch       = new 
AtlasEntitiesWithExtInfo(new ArrayList<>(entitiesList.subList(fromIdx, 
toIndex)));
+                    List<AtlasEntity> entitiesBatch = new 
ArrayList<>(entitiesList.subList(fromIdx, toIndex));
+
+                    updateProcessedEntityReferences(entitiesBatch, 
guidAssignments);
+
+                    AtlasEntitiesWithExtInfo batch       = new 
AtlasEntitiesWithExtInfo(entitiesBatch);
                     AtlasEntityStream        batchStream = new 
AtlasEntityStream(batch, entityStream);
 
-                    atlasEntityStore.createOrUpdate(batchStream, 
isPartialUpdate);
+                    EntityMutationResponse response = 
atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate);
+
+                    recordProcessedEntities(response, guidAssignments);
 
                     RequestContext.get().resetEntityGuidUpdates();
 
@@ -790,7 +805,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
         PreprocessorContext context = new PreprocessorContext(kafkaMsg, 
hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, 
hiveTypesRemoveOwnedRefAttrs, rdbmsTypesRemoveOwnedRefAttrs);
 
-        if (!hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || 
hiveTypesRemoveOwnedRefAttrs) {
+        if (context.isHivePreprocessEnabled()) {
             preprocessHiveTypes(context);
         }
 
@@ -803,6 +818,29 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         }
 
         context.moveRegisteredReferredEntities();
+
+        if (context.isHivePreprocessEnabled() && 
CollectionUtils.isNotEmpty(context.getEntities())) {
+            // move hive_process and hive_column_lineage entities to end of 
the list
+            List<AtlasEntity> entities = context.getEntities();
+            int               count    = entities.size();
+
+            for (int i = 0; i < count; i++) {
+                AtlasEntity entity = entities.get(i);
+
+                switch (entity.getTypeName()) {
+                    case TYPE_HIVE_PROCESS:
+                    case TYPE_HIVE_COLUMN_LINEAGE:
+                        entities.remove(i--);
+                        entities.add(entity);
+                        count--;
+                    break;
+                }
+            }
+
+            if (entities.size() - count > 0) {
+                LOG.info("moved {} hive_process/hive_column_lineage entities 
to end of list (listSize={})", entities.size() - count, entities.size());
+            }
+        }
     }
 
     private void rdbmsTypeRemoveOwnedRefAttrs(PreprocessorContext context) {
@@ -885,7 +923,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                         if (lineageQNames.contains(qualifiedName)) {
                             entities.remove(i--);
 
-                            LOG.warn("removed duplicate hive_column_lineage 
entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, 
lineageInputsCount, context.getKafkaMessageOffset(), 
context.getKafkaPartition());
+                            LOG.warn("removed duplicate hive_column_lineage 
entity: qualifiedName={}. topic-offset={}, partition={}", qualifiedName, 
context.getKafkaMessageOffset(), context.getKafkaPartition());
 
                             numRemovedEntities++;
 
@@ -954,6 +992,107 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         return ret;
     }
 
+    private void recordProcessedEntities(EntityMutationResponse 
mutationResponse, Map<String, String> guidAssignments) {
+        if (mutationResponse != null && 
MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) {
+            guidAssignments.putAll(mutationResponse.getGuidAssignments());
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("recordProcessedEntities: added {} guidAssignments. 
updatedSize={}", mutationResponse.getGuidAssignments().size(), 
guidAssignments.size());
+            }
+        }
+    }
+
+    private void updateProcessedEntityReferences(List<AtlasEntity> entities, 
Map<String, String> guidAssignments) {
+        if (CollectionUtils.isNotEmpty(entities) && 
MapUtils.isNotEmpty(guidAssignments)) {
+            for (AtlasEntity entity : entities) {
+                AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(entity.getTypeName());
+
+                if (entityType == null) {
+                    continue;
+                }
+
+                if (MapUtils.isNotEmpty(entity.getAttributes())) {
+                    for (Map.Entry<String, Object> entry : 
entity.getAttributes().entrySet()) {
+                        String         attrName  = entry.getKey();
+                        Object         attrValue = entry.getValue();
+
+                        if (attrValue == null) {
+                            continue;
+                        }
+
+                        AtlasAttribute attribute = 
entityType.getAttribute(attrName);
+
+                        if (attribute == null) { // look for a relationship 
attribute with the same name
+                            attribute = 
entityType.getRelationshipAttribute(attrName, null);
+                        }
+
+                        if (attribute != null && attribute.isObjectRef()) {
+                            updateProcessedEntityReferences(attrValue, 
guidAssignments);
+                        }
+                    }
+                }
+
+                if (MapUtils.isNotEmpty(entity.getRelationshipAttributes())) {
+                    for (Map.Entry<String, Object> entry : 
entity.getRelationshipAttributes().entrySet()) {
+                        Object attrValue = entry.getValue();
+
+                        if (attrValue != null) {
+                            updateProcessedEntityReferences(attrValue, 
guidAssignments);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private void updateProcessedEntityReferences(Object objVal, Map<String, 
String> guidAssignments) {
+        if (objVal instanceof AtlasObjectId) {
+            updateProcessedEntityReferences((AtlasObjectId) objVal, 
guidAssignments);
+        } else if (objVal instanceof Collection) {
+            updateProcessedEntityReferences((Collection) objVal, 
guidAssignments);
+        } else if (objVal instanceof Map) {
+            updateProcessedEntityReferences((Map) objVal, guidAssignments);
+        }
+    }
+
+    private void updateProcessedEntityReferences(AtlasObjectId objId, 
Map<String, String> guidAssignments) {
+        String guid = objId.getGuid();
+
+        if (guid != null && guidAssignments.containsKey(guid)) {
+            String assignedGuid = guidAssignments.get(guid);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("{}(guid={}) is already processed; updating its 
reference to use assigned-guid={}", objId.getTypeName(), guid, assignedGuid);
+            }
+
+            objId.setGuid(assignedGuid);
+            objId.setTypeName(null);
+            objId.setUniqueAttributes(null);
+        }
+    }
+
+    private void updateProcessedEntityReferences(Map objId, Map<String, 
String> guidAssignments) {
+        Object guid = objId.get(KEY_GUID);
+
+        if (guid != null && guidAssignments.containsKey(guid)) {
+            String assignedGuid = guidAssignments.get(guid);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("{}(guid={}) is already processed; updating its 
reference to use assigned-guid={}", objId.get(KEY_TYPENAME), guid, 
assignedGuid);
+            }
+
+            objId.put(KEY_GUID, assignedGuid);
+            objId.remove(KEY_TYPENAME);
+            objId.remove(KEY_UNIQUE_ATTRIBUTES);
+        }
+    }
+
+    private void updateProcessedEntityReferences(Collection objIds, 
Map<String, String> guidAssignments) {
+        for (Object objId : objIds) {
+            updateProcessedEntityReferences(objId, guidAssignments);
+        }
+    }
+
     static class FailedCommitOffsetRecorder {
         private Long currentOffset;
 
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
index ff9c9cb..9d6ad22 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/HivePreprocessor.java
@@ -36,7 +36,7 @@ public class HivePreprocessor {
     private static final Logger LOG = 
LoggerFactory.getLogger(HivePreprocessor.class);
 
     private static final String RELATIONSHIP_TYPE_HIVE_TABLE_COLUMNS        = 
"hive_table_columns";
-    private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = 
"hive_table_partitionKeys";
+    private static final String RELATIONSHIP_TYPE_HIVE_TABLE_PARTITION_KEYS = 
"hive_table_partitionkeys";
 
     static class HiveTablePreprocessor extends EntityPreprocessor {
         public HiveTablePreprocessor() {
@@ -76,7 +76,7 @@ public class HivePreprocessor {
         }
 
         private void removeColumnsAttributeAndRegisterToMove(AtlasEntity 
entity, String attrName, String relationshipType, PreprocessorContext context) {
-            Object attrVal = entity.getAttribute(attrName);
+            Object attrVal = entity.removeAttribute(attrName);
 
             if (attrVal != null) {
                 Set<String> guids = new HashSet<>();
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
index 94e0993..c85c1b8 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java
@@ -91,6 +91,10 @@ public class PreprocessorContext {
 
     public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return 
rdbmsTypesRemoveOwnedRefAttrs; }
 
+    public boolean isHivePreprocessEnabled() {
+        return !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() 
|| hiveTypesRemoveOwnedRefAttrs;
+    }
+
     public List<AtlasEntity> getEntities() {
         return entitiesWithExtInfo != null ? entitiesWithExtInfo.getEntities() 
: null;
     }

Reply via email to