ATLAS-2456: Implement tag propagation using relationships Signed-off-by: Madhan Neethiraj <mad...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/a3374c74 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/a3374c74 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/a3374c74 Branch: refs/heads/master Commit: a3374c747fb900ed44358b8b2c643e439820d2e6 Parents: 9c58d30 Author: Sarath Subramanian <ssubraman...@hortonworks.com> Authored: Mon Feb 19 22:58:36 2018 -0800 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Tue Feb 20 15:01:32 2018 -0800 ---------------------------------------------------------------------- addons/models/0000-Area0/0010-base_model.json | 4 +- addons/models/1000-Hadoop/1030-hive_model.json | 8 +- addons/models/1000-Hadoop/1060-hbase_model.json | 6 +- .../org/apache/atlas/repository/Constants.java | 4 + .../atlas/repository/graphdb/AtlasElement.java | 3 +- .../atlas/repository/graphdb/AtlasVertex.java | 8 + .../graphdb/janus/AtlasJanusVertex.java | 8 + .../repository/graphdb/titan0/Titan0Vertex.java | 14 + .../java/org/apache/atlas/AtlasErrorCode.java | 12 +- .../atlas/listener/EntityChangeListenerV2.java | 81 ++++ .../atlas/model/audit/EntityAuditEventV2.java | 175 ++++++++ .../model/instance/AtlasClassification.java | 45 +++ .../atlas/model/instance/AtlasEntity.java | 11 + .../model/notification/EntityNotification.java | 2 +- .../notification/EntityNotificationV2.java | 129 ++++++ .../repository/audit/EntityAuditListener.java | 14 +- .../repository/audit/EntityAuditListenerV2.java | 263 ++++++++++++ .../repository/audit/EntityAuditRepository.java | 47 ++- .../audit/HBaseBasedAuditRepository.java | 172 +++++++- .../audit/InMemoryEntityAuditRepository.java | 63 ++- .../audit/NoopEntityAuditRepository.java | 35 +- .../converters/AtlasInstanceConverter.java | 102 ++++- .../graph/GraphBackedSearchIndexer.java | 2 + .../atlas/repository/graph/GraphHelper.java | 288 ++++++++++++- .../graph/v1/AtlasEntityChangeNotifier.java | 170 +++++--- .../store/graph/v1/AtlasEntityStoreV1.java | 70 +--- .../graph/v1/AtlasRelationshipStoreV1.java | 87 +++- .../store/graph/v1/EntityGraphMapper.java | 399 +++++++++++++++---- .../store/graph/v1/EntityGraphRetriever.java | 392 +++++++++++++++--- .../atlas/util/AtlasGremlin3QueryProvider.java | 12 + .../atlas/util/AtlasGremlinQueryProvider.java | 5 +- .../util/AtlasRepositoryConfiguration.java | 15 + .../test/java/org/apache/atlas/TestModules.java | 6 + .../audit/AuditRepositoryTestBase.java | 16 +- .../EntityNotificationListenerV2.java | 216 ++++++++++ .../NotificationEntityChangeListener.java | 2 +- .../atlas/web/resources/EntityResource.java | 28 +- .../org/apache/atlas/web/rest/EntityREST.java | 59 ++- .../atlas/web/adapters/TestEntityREST.java | 2 +- .../web/integration/EntityJerseyResourceIT.java | 2 +- .../integration/EntityV2JerseyResourceIT.java | 2 +- .../test/resources/atlas-application.properties | 1 + 42 files changed, 2637 insertions(+), 343 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/addons/models/0000-Area0/0010-base_model.json ---------------------------------------------------------------------- diff --git a/addons/models/0000-Area0/0010-base_model.json b/addons/models/0000-Area0/0010-base_model.json index 0296e8f..aebe955 100644 --- a/addons/models/0000-Area0/0010-base_model.json +++ b/addons/models/0000-Area0/0010-base_model.json @@ -213,7 +213,7 @@ "cardinality": "SET", "isLegacyAttribute": true }, - "propagateTags": "NONE" + "propagateTags": "ONE_TO_TWO" }, { "name": "process_dataset_outputs", @@ -232,7 +232,7 @@ "isContainer": false, "cardinality": "SET" }, - "propagateTags": "NONE" + "propagateTags": "ONE_TO_TWO" } ] } http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/addons/models/1000-Hadoop/1030-hive_model.json ---------------------------------------------------------------------- diff --git a/addons/models/1000-Hadoop/1030-hive_model.json b/addons/models/1000-Hadoop/1030-hive_model.json index 32d9179..68a5c84 100644 --- a/addons/models/1000-Hadoop/1030-hive_model.json +++ b/addons/models/1000-Hadoop/1030-hive_model.json @@ -539,7 +539,7 @@ "cardinality": "SINGLE", "isLegacyAttribute": true }, - "propagateTags": "ONE_TO_TWO" + "propagateTags": "NONE" }, { "name": "hive_table_columns", @@ -559,7 +559,7 @@ "cardinality": "SINGLE", "isLegacyAttribute": true }, - "propagateTags": "ONE_TO_TWO" + "propagateTags": "NONE" }, { "name": "hive_table_partitionkeys", @@ -579,7 +579,7 @@ "cardinality": "SINGLE", "isLegacyAttribute": true }, - "propagateTags": "ONE_TO_TWO" + "propagateTags": "NONE" }, { "name": "hive_table_storagedesc", @@ -599,7 +599,7 @@ "cardinality": "SINGLE", "isLegacyAttribute": true }, - "propagateTags": "ONE_TO_TWO" + "propagateTags": "NONE" }, { "name": "hive_process_column_lineage", http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/addons/models/1000-Hadoop/1060-hbase_model.json ---------------------------------------------------------------------- diff --git a/addons/models/1000-Hadoop/1060-hbase_model.json b/addons/models/1000-Hadoop/1060-hbase_model.json index 9280f59..acf4ff5 100644 --- a/addons/models/1000-Hadoop/1060-hbase_model.json +++ b/addons/models/1000-Hadoop/1060-hbase_model.json @@ -157,7 +157,7 @@ "cardinality": "SINGLE", "isLegacyAttribute": true }, - "propagateTags": "ONE_TO_TWO" + "propagateTags": "NONE" }, { "name": "hbase_column_family_columns", @@ -177,7 +177,7 @@ "cardinality": "SINGLE", "isLegacyAttribute": true }, - "propagateTags": "ONE_TO_TWO" + "propagateTags": "NONE" } ] -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/common/src/main/java/org/apache/atlas/repository/Constants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 265be78..ae52880 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -69,6 +69,7 @@ public final class Constants { * Trait names property key and index name. */ public static final String TRAIT_NAMES_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "traitNames"; + public static final String PROPAGATED_TRAIT_NAMES_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "propagatedTraitNames"; public static final String VERSION_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "version"; public static final String STATE_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "state"; @@ -115,6 +116,9 @@ public final class Constants { public static final String ATTRIBUTE_NAME_VERSION = "version"; public static final String TEMP_STRUCT_NAME_PREFIX = "__tempQueryResultStruct"; + public static final String CLASSIFICATION_ENTITY_GUID = INTERNAL_PROPERTY_KEY_PREFIX + "entityGuid"; + public static final String CLASSIFICATION_PROPAGATE_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "propagate"; + private Constants() { } http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasElement.java ---------------------------------------------------------------------- diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasElement.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasElement.java index 42837f4..4af39ed 100644 --- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasElement.java +++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasElement.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.List; import java.util.Set; -import org.apache.atlas.AtlasException; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -85,7 +84,7 @@ public interface AtlasElement { * is needed for this because special logic is required to handle this situation * in some implementations. */ - void setListProperty(String propertyName, List<String> values) throws AtlasException; + void setListProperty(String propertyName, List<String> values); /** http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java ---------------------------------------------------------------------- diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java index a68d8eb..6de4dcf 100644 --- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java +++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasVertex.java @@ -55,6 +55,14 @@ public interface AtlasVertex<V, E> extends AtlasElement { */ <T> void addProperty(String propertyName, T value); + /** + * Adds a value to a multiplicity-many property. + * If the property is already present, the value is added to it; if not, the propery is set with the given value + * + * @param propertyName + * @param value + */ + <T> void addListProperty(String propertyName, T value); /** * Creates a vertex query. http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java ---------------------------------------------------------------------- diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java index aef20f0..71b2857 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusVertex.java @@ -52,6 +52,14 @@ public class AtlasJanusVertex extends AtlasJanusElement<Vertex> implements Atlas } } + @Override + public <T> void addListProperty(String propertyName, T value) { + try { + getWrappedElement().property(VertexProperty.Cardinality.list, propertyName, value); + } catch(SchemaViolationException e) { + throw new AtlasSchemaViolationException(e); + } + } @Override http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Vertex.java ---------------------------------------------------------------------- diff --git a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Vertex.java b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Vertex.java index ca48e3d..e439ab9 100644 --- a/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Vertex.java +++ b/graphdb/titan0/src/main/java/org/apache/atlas/repository/graphdb/titan0/Titan0Vertex.java @@ -100,6 +100,20 @@ public class Titan0Vertex extends Titan0Element<Vertex> implements AtlasVertex<T } @Override + public <T> void addListProperty(String propertyName, T value) { + try { + getAsTitanVertex().addProperty(propertyName, value); + } catch (SchemaViolationException e) { + if (getPropertyValues(propertyName, value.getClass()).contains(value)) { + // follow java set semantics, don't throw an exception if + // value is already there. + return; + } + throw new AtlasSchemaViolationException(e); + } + } + + @Override public <T> Collection<T> getPropertyValues(String key, Class<T> clazz) { TitanVertex tv = getAsTitanVertex(); http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java index ff09e6c..f1d4536 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java +++ b/intg/src/main/java/org/apache/atlas/AtlasErrorCode.java @@ -122,6 +122,10 @@ public enum AtlasErrorCode { INVALID_DSL_HAS_PROPERTY(400, "ATLAS-400-00-068", "DSL Semantic Error - Property needs to be a primitive type: {0}"), RELATIONSHIP_UPDATE_END_CHANGE_NOT_ALLOWED(404, "ATLAS-400-00-069", "change of relationship end is not permitted. relationship-type={}, relationship-guid={}, end-guid={}, updated-end-guid={}"), RELATIONSHIP_UPDATE_TYPE_CHANGE_NOT_ALLOWED(404, "ATLAS-400-00-06A", "change of relationship type is not permitted. relationship-guid={}, current-type={}, new-type={}"), + CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY(400, "ATLAS-400-00-06B", "Update to classification {0} is not allowed from propagated entity"), + CLASSIFICATION_DELETE_FROM_PROPAGATED_ENTITY(400, "ATLAS-400-00-06C", "Delete of classification {0} is not allowed from propagated entity"), + CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY(400, "ATLAS-400-00-06D", "Classification {0} is not associated with entity"), + // All Not found enums go here TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-001", "Given typename {0} was invalid"), TYPE_GUID_NOT_FOUND(404, "ATLAS-404-00-002", "Given type guid {0} was invalid"), @@ -137,6 +141,7 @@ public enum AtlasErrorCode { RELATIONSHIP_CRUD_INVALID_PARAMS(404, "ATLAS-404-00-00D", "Invalid relationship creation/updation parameters passed : {0}"), RELATIONSHIPDEF_END_TYPE_NAME_NOT_FOUND(404, "ATLAS-404-00-00E", "RelationshipDef {0} endDef typename {0} cannot be found"), RELATIONSHIP_ALREADY_DELETED(404, "ATLAS-404-00-00F", "Attempting to delete a relationship which is already deleted : {0}"), + INVALID_ENTITY_GUID_FOR_CLASSIFICATION_UPDATE(404, "ATLAS-404-00-010", "Updating entityGuid of classification is not allowed."), // All data conflict errors go here TYPE_ALREADY_EXISTS(409, "ATLAS-409-00-001", "Given type {0} already exists"), @@ -157,7 +162,6 @@ public enum AtlasErrorCode { FAILED_TO_OBTAIN_GREMLIN_SCRIPT_ENGINE(500, "ATLAS-500-00-008", "Failed to obtain gremlin script engine: {0}"), JSON_ERROR_OBJECT_MAPPER_NULL_RETURNED(500, "ATLAS-500-00-009", "ObjectMapper.readValue returned NULL for class: {0}"), GREMLIN_SCRIPT_EXECUTION_FAILED(500, "ATLAS-500-00-00A", "Gremlin script execution failed: {0}"), - CURATOR_FRAMEWORK_UPDATE(500, "ATLAS-500-00-00B", "ActiveInstanceState.update resulted in exception."), QUICK_START(500, "ATLAS-500-00-00C", "Failed to run QuickStart: {0}"), EMBEDDED_SERVER_START(500, "ATLAS-500-00-00D", "EmbeddedServer.Start: failed!"), @@ -165,9 +169,9 @@ public enum AtlasErrorCode { SQOOP_HOOK(500, "ATLAS-500-00-00F", "SqoopHook: {0}"), HIVE_HOOK(500, "ATLAS-500-00-010", "HiveHook: {0}"), HIVE_HOOK_METASTORE_BRIDGE(500, "ATLAS-500-00-011", "HiveHookMetaStoreBridge: {0}"), - - DATA_ACCESS_SAVE_FAILED(500, "ATLAS-500-00-00B", "Save failed: {0}"), - DATA_ACCESS_LOAD_FAILED(500, "ATLAS-500-00-00C", "Load failed: {0}"); + DATA_ACCESS_SAVE_FAILED(500, "ATLAS-500-00-012", "Save failed: {0}"), + DATA_ACCESS_LOAD_FAILED(500, "ATLAS-500-00-013", "Load failed: {0}"), + ENTITY_NOTIFICATION_FAILED(500, "ATLAS-500-00-014", "Notification failed for operation: {} : {}"); private String errorCode; private String errorMessage; http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java new file mode 100644 index 0000000..70877d2 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/listener/EntityChangeListenerV2.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.listener; + +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; + +import java.util.List; + +/** + * Entity change notification listener V2. + */ +public interface EntityChangeListenerV2 { + /** + * This is upon adding new entities to the repository. + * + * @param entities the created entities + * @param isImport + */ + void onEntitiesAdded(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException; + + /** + * This is upon updating an entity. + * + * @param entities the updated entities + * @param isImport + */ + void onEntitiesUpdated(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException; + + /** + * This is upon deleting entities from the repository. + * + * @param entities the deleted entities + * @param isImport + */ + void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException; + + /** + * This is upon adding new classifications to an entity. + * + * @param entity the entity + * @param classifications classifications that needs to be added to an entity + * @throws AtlasBaseException if the listener notification fails + */ + void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException; + + /** + * This is upon updating classifications to an entity. + * + * @param entity the entity + * @param classifications classifications that needs to be updated for an entity + * @throws AtlasBaseException if the listener notification fails + */ + void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException; + + /** + * This is upon deleting classifications from an entity. + * + * @param entity the entity + * @param classificationNames classifications names for the instance that needs to be deleted from entity + * @throws AtlasBaseException if the listener notification fails + */ + void onClassificationsDeleted(AtlasEntity entity, List<String> classificationNames) throws AtlasBaseException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java new file mode 100644 index 0000000..741e371 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/model/audit/EntityAuditEventV2.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.model.audit; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.type.AtlasType; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.Objects; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; + +/** + * Structure of v2 entity audit event + */ +@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE) +@JsonSerialize(include=JsonSerialize.Inclusion.ALWAYS) +@JsonIgnoreProperties(ignoreUnknown=true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.PROPERTY) +public class EntityAuditEventV2 implements Serializable { + public enum EntityAuditAction { + ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, + ENTITY_IMPORT_CREATE, ENTITY_IMPORT_UPDATE, ENTITY_IMPORT_DELETE, + CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE, + } + + private String entityId; + private long timestamp; + private String user; + private EntityAuditAction action; + private String details; + private String eventKey; + private AtlasEntity entity; + + public EntityAuditEventV2() { } + + public EntityAuditEventV2(String entityId, long timestamp, String user, EntityAuditAction action, String details, + AtlasEntity entity) { + setEntityId(entityId); + setTimestamp(timestamp); + setUser(user); + setAction(action); + setDetails(details); + setEntity(entity); + } + + public String getEntityId() { + return entityId; + } + + public void setEntityId(String entityId) { + this.entityId = entityId; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public EntityAuditAction getAction() { + return action; + } + + public void setAction(EntityAuditAction action) { + this.action = action; + } + + public String getDetails() { + return details; + } + + public void setDetails(String details) { + this.details = details; + } + + public String getEventKey() { + return eventKey; + } + + public void setEventKey(String eventKey) { + this.eventKey = eventKey; + } + + public AtlasEntity getEntity() { + return entity; + } + + public void setEntity(AtlasEntity entity) { + this.entity = entity; + } + + @JsonIgnore + public String getEntityDefinitionString() { + if (entity != null) { + return AtlasType.toJson(entity); + } + + return null; + } + + @JsonIgnore + public void setEntityDefinition(String entityDefinition) { + this.entity = AtlasType.fromJson(entityDefinition, AtlasEntity.class); + } + + @Override + public boolean equals(Object o) { + if (this == o) { return true; } + if (o == null || getClass() != o.getClass()) { return false; } + EntityAuditEventV2 that = (EntityAuditEventV2) o; + + return timestamp == that.timestamp && + Objects.equals(entityId, that.entityId) && + Objects.equals(user, that.user) && + action == that.action && + Objects.equals(details, that.details) && + Objects.equals(eventKey, that.eventKey) && + Objects.equals(entity, that.entity); + } + + @Override + public int hashCode() { + return Objects.hash(entityId, timestamp, user, action, details, eventKey, entity); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("EntityAuditEventV2{"); + sb.append("entityId='").append(entityId).append('\''); + sb.append(", timestamp=").append(timestamp); + sb.append(", user='").append(user).append('\''); + sb.append(", action=").append(action); + sb.append(", details='").append(details).append('\''); + sb.append(", eventKey='").append(eventKey).append('\''); + sb.append(", entity=").append(entity); + sb.append('}'); + + return sb.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java index f594a81..a499f79 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasClassification.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize; import java.io.Serializable; import java.util.List; import java.util.Map; +import java.util.Objects; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; @@ -49,6 +50,9 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ public class AtlasClassification extends AtlasStruct implements Serializable { private static final long serialVersionUID = 1L; + private String entityGuid = null; + private boolean propagate = true; + public AtlasClassification() { this(null, null); } @@ -76,6 +80,47 @@ public class AtlasClassification extends AtlasStruct implements Serializable { } } + public String getEntityGuid() { + return entityGuid; + } + + public void setEntityGuid(String entityGuid) { + this.entityGuid = entityGuid; + } + + public boolean isPropagate() { + return propagate; + } + + public void setPropagate(boolean propagate) { + this.propagate = propagate; + } + + @Override + public boolean equals(Object o) { + if (this == o) { return true; } + if (o == null || getClass() != o.getClass()) { return false; } + if (!super.equals(o)) { return false; } + AtlasClassification that = (AtlasClassification) o; + return propagate == that.propagate && + Objects.equals(entityGuid, that.entityGuid); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), entityGuid, propagate); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("AtlasClassification{"); + super.toString(sb); + sb.append("entityGuid='").append(entityGuid).append('\''); + sb.append(", propagate=").append(propagate); + sb.append('}'); + return sb.toString(); + } + /** * REST serialization friendly list. */ http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java index 08d1ce1..fce46da 100644 --- a/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java +++ b/intg/src/main/java/org/apache/atlas/model/instance/AtlasEntity.java @@ -259,6 +259,17 @@ public class AtlasEntity extends AtlasStruct implements Serializable { public void setClassifications(List<AtlasClassification> classifications) { this.classifications = classifications; } + public void addClassifications(List<AtlasClassification> classifications) { + List<AtlasClassification> c = this.classifications; + + if (c == null) { + c = new ArrayList<>(classifications); + + this.classifications = c; + } + + c.addAll(classifications); + } private void init() { setGuid(nextInternalId()); http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java b/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java index b272b73..f70eb3f 100644 --- a/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java +++ b/intg/src/main/java/org/apache/atlas/model/notification/EntityNotification.java @@ -44,7 +44,7 @@ public class EntityNotification implements Serializable { * Type of the hook message. */ public enum EntityNotificationType { - ENTITY_NOTIFICATION_V1 + ENTITY_NOTIFICATION_V1, ENTITY_NOTIFICATION_V2 } protected EntityNotificationType type; http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotificationV2.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotificationV2.java b/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotificationV2.java new file mode 100644 index 0000000..a8dfd23 --- /dev/null +++ b/intg/src/main/java/org/apache/atlas/v1/model/notification/EntityNotificationV2.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.v1.model.notification; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.notification.EntityNotification; +import org.apache.atlas.model.typedef.AtlasBaseTypeDef; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlRootElement; +import java.io.Serializable; +import java.util.List; +import java.util.Objects; + +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; +import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; +import static org.apache.atlas.model.notification.EntityNotification.EntityNotificationType.ENTITY_NOTIFICATION_V2; + +/** + * Entity v2 notification + */ +@JsonAutoDetect(getterVisibility=PUBLIC_ONLY, setterVisibility=PUBLIC_ONLY, fieldVisibility=NONE) +@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonIgnoreProperties(ignoreUnknown=true) +@XmlRootElement +@XmlAccessorType(XmlAccessType.PROPERTY) +public class EntityNotificationV2 extends EntityNotification implements Serializable { + private static final long serialVersionUID = 1L; + + public enum OperationType { + ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, + CLASSIFICATION_ADD, CLASSIFICATION_DELETE, CLASSIFICATION_UPDATE + } + + private AtlasEntity entity; + private OperationType operationType; + private List<AtlasClassification> classifications; + + public EntityNotificationV2() { } + + public EntityNotificationV2(AtlasEntity entity, OperationType operationType, List<AtlasClassification> classifications) { + setEntity(entity); + setOperationType(operationType); + setClassifications(classifications); + setType(ENTITY_NOTIFICATION_V2); + } + + public AtlasEntity getEntity() { + return entity; + } + + public void setEntity(AtlasEntity entity) { + this.entity = entity; + } + + public OperationType getOperationType() { + return operationType; + } + + public void setOperationType(OperationType operationType) { + this.operationType = operationType; + } + + public List<AtlasClassification> getClassifications() { + return classifications; + } + + public void setClassifications(List<AtlasClassification> classifications) { + this.classifications = classifications; + } + + @Override + public boolean equals(Object o) { + if (this == o) { return true; } + if (o == null || getClass() != o.getClass()) { return false; } + EntityNotificationV2 that = (EntityNotificationV2) o; + return Objects.equals(entity, that.entity) && + operationType == that.operationType && + Objects.equals(classifications, that.classifications); + } + + @Override + public int hashCode() { + return Objects.hash(entity, operationType, classifications); + } + + @Override + public StringBuilder toString(StringBuilder sb) { + if (sb == null) { + sb = new StringBuilder(); + } + + sb.append("EntityNotificationV1{"); + super.toString(sb); + sb.append(", entity="); + if (entity != null) { + entity.toString(sb); + } else { + sb.append(entity); + } + sb.append(", operationType=").append(operationType); + sb.append(", classifications=["); + AtlasBaseTypeDef.dumpObjects(classifications, sb); + sb.append("]"); + sb.append("}"); + + return sb; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java index 74d3b91..1c04eea 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java @@ -67,7 +67,7 @@ public class EntityAuditListener implements EntityChangeListener { events.add(event); } - auditRepository.putEvents(events); + auditRepository.putEventsV1(events); } @Override @@ -78,7 +78,7 @@ public class EntityAuditListener implements EntityChangeListener { events.add(event); } - auditRepository.putEvents(events); + auditRepository.putEventsV1(events); } @Override @@ -88,7 +88,7 @@ public class EntityAuditListener implements EntityChangeListener { EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_ADD, "Added trait: " + AtlasType.toV1Json(trait)); - auditRepository.putEvents(event); + auditRepository.putEventsV1(event); } } } @@ -99,7 +99,7 @@ public class EntityAuditListener implements EntityChangeListener { for (String traitName : traitNames) { EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName); - auditRepository.putEvents(event); + auditRepository.putEventsV1(event); } } } @@ -111,7 +111,7 @@ public class EntityAuditListener implements EntityChangeListener { EntityAuditEvent event = createEvent(entity, EntityAuditAction.TAG_UPDATE, "Updated trait: " + AtlasType.toV1Json(trait)); - auditRepository.putEvents(event); + auditRepository.putEventsV1(event); } } } @@ -124,11 +124,11 @@ public class EntityAuditListener implements EntityChangeListener { events.add(event); } - auditRepository.putEvents(events); + auditRepository.putEventsV1(events); } public List<EntityAuditEvent> getAuditEvents(String guid) throws AtlasException{ - return auditRepository.listEvents(guid, null, (short) 10); + return auditRepository.listEventsV1(guid, null, (short) 10); } private EntityAuditEvent createEvent(Referenceable entity, EntityAuditAction action) http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java new file mode 100644 index 0000000..bb51014 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListenerV2.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.audit; + +import org.apache.atlas.RequestContextV1; +import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.listener.EntityChangeListenerV2; +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasStructType.AtlasAttribute; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_ADD; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_DELETE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_UPDATE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_CREATE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_DELETE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_CREATE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_DELETE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_IMPORT_UPDATE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.ENTITY_UPDATE; + +@Component +public class EntityAuditListenerV2 implements EntityChangeListenerV2 { + private static final Logger LOG = LoggerFactory.getLogger(EntityAuditListenerV2.class); + + private final EntityAuditRepository auditRepository; + private final AtlasTypeRegistry typeRegistry; + + @Inject + public EntityAuditListenerV2(EntityAuditRepository auditRepository, AtlasTypeRegistry typeRegistry) { + this.auditRepository = auditRepository; + this.typeRegistry = typeRegistry; + } + + @Override + public void onEntitiesAdded(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException { + List<EntityAuditEventV2> events = new ArrayList<>(); + + for (AtlasEntity entity : entities) { + EntityAuditEventV2 event = createEvent(entity, isImport ? ENTITY_IMPORT_CREATE : ENTITY_CREATE); + + events.add(event); + } + + auditRepository.putEventsV2(events); + } + + @Override + public void onEntitiesUpdated(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException { + List<EntityAuditEventV2> events = new ArrayList<>(); + + for (AtlasEntity entity : entities) { + EntityAuditEventV2 event = createEvent(entity, isImport ? ENTITY_IMPORT_UPDATE : ENTITY_UPDATE); + + events.add(event); + } + + auditRepository.putEventsV2(events); + } + + @Override + public void onEntitiesDeleted(List<AtlasEntity> entities, boolean isImport) throws AtlasBaseException { + List<EntityAuditEventV2> events = new ArrayList<>(); + + for (AtlasEntity entity : entities) { + EntityAuditEventV2 event = createEvent(entity, isImport ? ENTITY_IMPORT_DELETE : ENTITY_DELETE, "Deleted entity"); + + events.add(event); + } + + auditRepository.putEventsV2(events); + } + + @Override + public void onClassificationsAdded(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException { + if (CollectionUtils.isNotEmpty(classifications)) { + List<EntityAuditEventV2> events = new ArrayList<>(); + + for (AtlasClassification classification : classifications) { + events.add(createEvent(entity, CLASSIFICATION_ADD, "Added classification: " + AtlasType.toJson(classification))); + } + + auditRepository.putEventsV2(events); + } + } + + @Override + public void onClassificationsUpdated(AtlasEntity entity, List<AtlasClassification> classifications) throws AtlasBaseException { + if (CollectionUtils.isNotEmpty(classifications)) { + List<EntityAuditEventV2> events = new ArrayList<>(); + + for (AtlasClassification classification : classifications) { + events.add(createEvent(entity, CLASSIFICATION_UPDATE, "Updated classification: " + AtlasType.toJson(classification))); + } + + auditRepository.putEventsV2(events); + } + } + + @Override + public void onClassificationsDeleted(AtlasEntity entity, List<String> classificationNames) throws AtlasBaseException { + if (CollectionUtils.isNotEmpty(classificationNames)) { + List<EntityAuditEventV2> events = new ArrayList<>(); + + for (String classificationName : classificationNames) { + events.add(createEvent(entity, CLASSIFICATION_DELETE, "Deleted classification: " + classificationName)); + } + + auditRepository.putEventsV2(events); + } + } + + private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditAction action, String details) { + return new EntityAuditEventV2(entity.getGuid(), RequestContextV1.get().getRequestTime(), + RequestContextV1.get().getUser(), action, details, entity); + } + + private EntityAuditEventV2 createEvent(AtlasEntity entity, EntityAuditAction action) { + String detail = getAuditEventDetail(entity, action); + + return createEvent(entity, action, detail); + } + + private String getAuditEventDetail(AtlasEntity entity, EntityAuditAction action) { + Map<String, Object> prunedAttributes = pruneEntityAttributesForAudit(entity); + + String auditPrefix = getAuditPrefix(action); + String auditString = auditPrefix + AtlasType.toJson(entity); + byte[] auditBytes = auditString.getBytes(StandardCharsets.UTF_8); + long auditSize = auditBytes != null ? auditBytes.length : 0; + long auditMaxSize = auditRepository.repositoryMaxSize(); + + if (auditMaxSize >= 0 && auditSize > auditMaxSize) { // don't store attributes in audit + LOG.warn("audit record too long: entityType={}, guid={}, size={}; maxSize={}. entity attribute values not stored in audit", + entity.getTypeName(), entity.getGuid(), auditSize, auditMaxSize); + + Map<String, Object> attrValues = entity.getAttributes(); + + entity.setAttributes(null); + + auditString = auditPrefix + AtlasType.toJson(entity); + + entity.setAttributes(attrValues); + } + + restoreEntityAttributes(entity, prunedAttributes); + + return auditString; + } + + private Map<String, Object> pruneEntityAttributesForAudit(AtlasEntity entity) { + Map<String, Object> ret = null; + Map<String, Object> entityAttributes = entity.getAttributes(); + List<String> excludeAttributes = auditRepository.getAuditExcludeAttributes(entity.getTypeName()); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + + if (CollectionUtils.isNotEmpty(excludeAttributes) && MapUtils.isNotEmpty(entityAttributes) && entityType != null) { + for (AtlasAttribute attribute : entityType.getAllAttributes().values()) { + String attrName = attribute.getName(); + Object attrValue = entityAttributes.get(attrName); + + if (excludeAttributes.contains(attrName)) { + if (ret == null) { + ret = new HashMap<>(); + } + + ret.put(attrName, attrValue); + entityAttributes.remove(attrName); + } + } + } + + return ret; + } + + private void restoreEntityAttributes(AtlasEntity entity, Map<String, Object> prunedAttributes) { + if (MapUtils.isEmpty(prunedAttributes)) { + return; + } + + AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName()); + + if (entityType != null && MapUtils.isNotEmpty(entityType.getAllAttributes())) { + for (AtlasAttribute attribute : entityType.getAllAttributes().values()) { + String attrName = attribute.getName(); + + if (prunedAttributes.containsKey(attrName)) { + entity.setAttribute(attrName, prunedAttributes.get(attrName)); + } + } + } + } + + private String getAuditPrefix(EntityAuditAction action) { + final String ret; + + switch (action) { + case ENTITY_CREATE: + ret = "Created: "; + break; + case ENTITY_UPDATE: + ret = "Updated: "; + break; + case ENTITY_DELETE: + ret = "Deleted: "; + break; + case CLASSIFICATION_ADD: + ret = "Added classification: "; + break; + case CLASSIFICATION_DELETE: + ret = "Deleted classification: "; + break; + case CLASSIFICATION_UPDATE: + ret = "Updated classification: "; + break; + case ENTITY_IMPORT_CREATE: + ret = "Created by import: "; + break; + case ENTITY_IMPORT_UPDATE: + ret = "Updated by import: "; + break; + case ENTITY_IMPORT_DELETE: + ret = "Deleted by import: "; + break; + default: + ret = "Unknown: "; + } + + return ret; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java index 9dc7835..aab2d5b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditRepository.java @@ -20,6 +20,8 @@ package org.apache.atlas.repository.audit; import org.apache.atlas.AtlasException; import org.apache.atlas.EntityAuditEvent; +import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.atlas.exception.AtlasBaseException; import java.util.List; @@ -32,14 +34,14 @@ public interface EntityAuditRepository { * @param events events to be added * @throws AtlasException */ - void putEvents(EntityAuditEvent... events) throws AtlasException; + void putEventsV1(EntityAuditEvent... events) throws AtlasException; /** * Add events to the event repository * @param events events to be added * @throws AtlasException */ - void putEvents(List<EntityAuditEvent> events) throws AtlasException; + void putEventsV1(List<EntityAuditEvent> events) throws AtlasException; /** * List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results @@ -49,13 +51,48 @@ public interface EntityAuditRepository { * @return list of events * @throws AtlasException */ - List<EntityAuditEvent> listEvents(String entityId, String startKey, short n) throws AtlasException; + List<EntityAuditEvent> listEventsV1(String entityId, String startKey, short n) throws AtlasException; + + /** + * Add v2 events to the event repository + * @param events events to be added + * @throws AtlasBaseException + */ + void putEventsV2(EntityAuditEventV2... events) throws AtlasBaseException; + + /** + * Add v2 events to the event repository + * @param events events to be added + * @throws AtlasBaseException + */ + void putEventsV2(List<EntityAuditEventV2> events) throws AtlasBaseException; + + /** + * List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results + * @param entityId entity id + * @param startKey key for the first event to be returned, used for pagination + * @param n number of events to be returned + * @return list of events + * @throws AtlasBaseException + */ + List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short n) throws AtlasBaseException; + + + /** + * List events for the given entity id in decreasing order of timestamp, from the given timestamp. Returns n results + * @param entityId entity id + * @param startKey key for the first event to be returned, used for pagination + * @param n number of events to be returned + * @return list of events + * @throws AtlasBaseException + */ + List<Object> listEvents(String entityId, String startKey, short n) throws AtlasBaseException; /** * Returns maximum allowed repository size per EntityAuditEvent * @throws AtlasException */ - long repositoryMaxSize() throws AtlasException; + long repositoryMaxSize(); /** * list of attributes to be excluded when storing in audit repo. @@ -63,5 +100,5 @@ public interface EntityAuditRepository { * @return list of attribute names to be excluded * @throws AtlasException */ - List<String> getAuditExcludeAttributes(String entityType) throws AtlasException; + List<String> getAuditExcludeAttributes(String entityType); } http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java index 774934c..a22f421 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java @@ -23,9 +23,13 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.annotation.ConditionalOnAtlasProperty; +import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction; +import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.service.Service; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -116,17 +120,17 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository * @throws AtlasException */ @Override - public void putEvents(EntityAuditEvent... events) throws AtlasException { - putEvents(Arrays.asList(events)); + public void putEventsV1(EntityAuditEvent... events) throws AtlasException { + putEventsV1(Arrays.asList(events)); } - @Override /** * Add events to the event repository * @param events events to be added * @throws AtlasException */ - public void putEvents(List<EntityAuditEvent> events) throws AtlasException { + @Override + public void putEventsV1(List<EntityAuditEvent> events) throws AtlasException { if (LOG.isDebugEnabled()) { LOG.debug("Putting {} events", events.size()); } @@ -154,6 +158,146 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository } } + @Override + public void putEventsV2(EntityAuditEventV2... events) throws AtlasBaseException { + putEventsV2(Arrays.asList(events)); + } + + @Override + public void putEventsV2(List<EntityAuditEventV2> events) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("Putting {} events", events.size()); + } + + Table table = null; + + try { + table = connection.getTable(tableName); + List<Put> puts = new ArrayList<>(events.size()); + + for (EntityAuditEventV2 event : events) { + if (LOG.isDebugEnabled()) { + LOG.debug("Adding entity audit event {}", event); + } + + Put put = new Put(getKey(event.getEntityId(), event.getTimestamp())); + + addColumn(put, COLUMN_ACTION, event.getAction()); + addColumn(put, COLUMN_USER, event.getUser()); + addColumn(put, COLUMN_DETAIL, event.getDetails()); + + if (persistEntityDefinition) { + addColumn(put, COLUMN_DEFINITION, event.getEntity()); + } + + puts.add(put); + } + + table.put(puts); + } catch (IOException e) { + throw new AtlasBaseException(e); + } finally { + try { + close(table); + } catch (AtlasException e) { + throw new AtlasBaseException(e); + } + } + } + + @Override + public List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short n) throws AtlasBaseException { + if (LOG.isDebugEnabled()) { + LOG.debug("Listing events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, n); + } + + Table table = null; + ResultScanner scanner = null; + + try { + table = connection.getTable(tableName); + + /** + * Scan Details: + * In hbase, the events are stored in increasing order of timestamp. So, doing reverse scan to get the latest event first + * Page filter is set to limit the number of results returned. + * Stop row is set to the entity id to avoid going past the current entity while scanning + * small is set to true to optimise RPC calls as the scanner is created per request + */ + Scan scan = new Scan().setReversed(true).setFilter(new PageFilter(n)) + .setStopRow(Bytes.toBytes(entityId)) + .setCaching(n) + .setSmall(true); + + if (StringUtils.isEmpty(startKey)) { + //Set start row to entity id + max long value + byte[] entityBytes = getKey(entityId, Long.MAX_VALUE); + scan = scan.setStartRow(entityBytes); + } else { + scan = scan.setStartRow(Bytes.toBytes(startKey)); + } + + scanner = table.getScanner(scan); + List<EntityAuditEventV2> events = new ArrayList<>(); + + Result result; + + //PageFilter doesn't ensure n results are returned. The filter is per region server. + //So, adding extra check on n here + while ((result = scanner.next()) != null && events.size() < n) { + EntityAuditEventV2 event = fromKeyV2(result.getRow()); + + //In case the user sets random start key, guarding against random events + if (!event.getEntityId().equals(entityId)) { + continue; + } + event.setUser(getResultString(result, COLUMN_USER)); + event.setAction(EntityAuditAction.valueOf(getResultString(result, COLUMN_ACTION))); + event.setDetails(getResultString(result, COLUMN_DETAIL)); + + if (persistEntityDefinition) { + String colDef = getResultString(result, COLUMN_DEFINITION); + + if (colDef != null) { + event.setEntityDefinition(colDef); + } + } + + events.add(event); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Got events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, events.size()); + } + + return events; + } catch (IOException e) { + throw new AtlasBaseException(e); + } finally { + try { + close(scanner); + close(table); + } catch (AtlasException e) { + throw new AtlasBaseException(e); + } + } + } + + @Override + public List<Object> listEvents(String entityId, String startKey, short maxResults) throws AtlasBaseException { + List ret = listEventsV2(entityId, startKey, maxResults); + + try { + if (CollectionUtils.isEmpty(ret)) { + ret = listEventsV1(entityId, startKey, maxResults); + } + } catch (AtlasException e) { + throw new AtlasBaseException(e); + } + + return ret; + } + private <T> void addColumn(Put put, byte[] columnName, T columnValue) { if (columnValue != null && !columnValue.toString().isEmpty()) { put.addColumn(COLUMN_FAMILY, columnName, Bytes.toBytes(columnValue.toString())); @@ -175,7 +319,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository * @return list of events * @throws AtlasException */ - public List<EntityAuditEvent> listEvents(String entityId, String startKey, short n) + public List<EntityAuditEvent> listEventsV1(String entityId, String startKey, short n) throws AtlasException { if (LOG.isDebugEnabled()) { LOG.debug("Listing events for entity id {}, starting timestamp {}, #records {}", entityId, startKey, n); @@ -243,7 +387,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository } @Override - public long repositoryMaxSize() throws AtlasException { + public long repositoryMaxSize() { long ret; initApplicationProperties(); @@ -257,7 +401,7 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository } @Override - public List<String> getAuditExcludeAttributes(String entityType) throws AtlasException { + public List<String> getAuditExcludeAttributes(String entityType) { List<String> ret = null; initApplicationProperties(); @@ -308,6 +452,20 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository return event; } + private EntityAuditEventV2 fromKeyV2(byte[] keyBytes) { + String key = Bytes.toString(keyBytes); + EntityAuditEventV2 event = new EntityAuditEventV2(); + + if (StringUtils.isNotEmpty(key)) { + String[] parts = key.split(FIELD_SEPARATOR); + event.setEntityId(parts[0]); + event.setTimestamp(Long.valueOf(parts[1])); + event.setEventKey(key); + } + + return event; + } + private void close(Closeable closeable) throws AtlasException { if (closeable != null) { try { http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java index 22d2a81..dca3b85 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/InMemoryEntityAuditRepository.java @@ -21,6 +21,8 @@ package org.apache.atlas.repository.audit; import org.apache.atlas.AtlasException; import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.annotation.ConditionalOnAtlasProperty; +import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Component; import javax.inject.Singleton; @@ -37,15 +39,16 @@ import java.util.TreeMap; @Component @ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepository.impl") public class InMemoryEntityAuditRepository implements EntityAuditRepository { - private TreeMap<String, EntityAuditEvent> auditEvents = new TreeMap<>(); + private TreeMap<String, EntityAuditEvent> auditEvents = new TreeMap<>(); + private TreeMap<String, EntityAuditEventV2> auditEventsV2 = new TreeMap<>(); @Override - public void putEvents(EntityAuditEvent... events) throws AtlasException { - putEvents(Arrays.asList(events)); + public void putEventsV1(EntityAuditEvent... events) throws AtlasException { + putEventsV1(Arrays.asList(events)); } @Override - public synchronized void putEvents(List<EntityAuditEvent> events) throws AtlasException { + public synchronized void putEventsV1(List<EntityAuditEvent> events) throws AtlasException { for (EntityAuditEvent event : events) { String rowKey = event.getEntityId() + (Long.MAX_VALUE - event.getTimestamp()); event.setEventKey(rowKey); @@ -56,8 +59,7 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository { //synchronized to avoid concurrent modification exception that occurs if events are added //while we are iterating through the map @Override - public synchronized List<EntityAuditEvent> listEvents(String entityId, String startKey, short maxResults) - throws AtlasException { + public synchronized List<EntityAuditEvent> listEventsV1(String entityId, String startKey, short maxResults) { List<EntityAuditEvent> events = new ArrayList<>(); String myStartKey = startKey; if (myStartKey == null) { @@ -73,12 +75,57 @@ public class InMemoryEntityAuditRepository implements EntityAuditRepository { } @Override - public long repositoryMaxSize() throws AtlasException { + public long repositoryMaxSize() { return -1; } @Override - public List<String> getAuditExcludeAttributes(String entityType) throws AtlasException { + public List<String> getAuditExcludeAttributes(String entityType) { return null; } + + @Override + public void putEventsV2(EntityAuditEventV2... events) { + putEventsV2(Arrays.asList(events)); + } + + @Override + public void putEventsV2(List<EntityAuditEventV2> events) { + for (EntityAuditEventV2 event : events) { + String rowKey = event.getEntityId() + (Long.MAX_VALUE - event.getTimestamp()); + event.setEventKey(rowKey); + auditEventsV2.put(rowKey, event); + } + } + + @Override + public List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short maxResults) { + List<EntityAuditEventV2> events = new ArrayList<>(); + String myStartKey = startKey; + + if (myStartKey == null) { + myStartKey = entityId; + } + + SortedMap<String, EntityAuditEventV2> subMap = auditEventsV2.tailMap(myStartKey); + + for (EntityAuditEventV2 event : subMap.values()) { + if (events.size() < maxResults && event.getEntityId().equals(entityId)) { + events.add(event); + } + } + + return events; + } + + @Override + public List<Object> listEvents(String entityId, String startKey, short maxResults) { + List events = listEventsV2(entityId, startKey, maxResults); + + if (CollectionUtils.isEmpty(events)) { + events = listEventsV1(entityId, startKey, maxResults); + } + + return events; + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java index c382601..e3a6078 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/NoopEntityAuditRepository.java @@ -18,9 +18,9 @@ package org.apache.atlas.repository.audit; -import org.apache.atlas.AtlasException; import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.annotation.ConditionalOnAtlasProperty; +import org.apache.atlas.model.audit.EntityAuditEventV2; import org.springframework.stereotype.Component; import javax.inject.Singleton; @@ -36,28 +36,47 @@ import java.util.List; public class NoopEntityAuditRepository implements EntityAuditRepository { @Override - public void putEvents(EntityAuditEvent... events) throws AtlasException { + public void putEventsV1(EntityAuditEvent... events) { //do nothing } @Override - public synchronized void putEvents(List<EntityAuditEvent> events) throws AtlasException { + public synchronized void putEventsV1(List<EntityAuditEvent> events) { //do nothing } @Override - public List<EntityAuditEvent> listEvents(String entityId, String startKey, short maxResults) - throws AtlasException { + public List<EntityAuditEvent> listEventsV1(String entityId, String startKey, short maxResults) { return Collections.emptyList(); } @Override - public long repositoryMaxSize() throws AtlasException { + public void putEventsV2(EntityAuditEventV2... events) { + //do nothing + } + + @Override + public void putEventsV2(List<EntityAuditEventV2> events) { + //do nothing + } + + @Override + public List<EntityAuditEventV2> listEventsV2(String entityId, String startKey, short n) { + return Collections.emptyList(); + } + + @Override + public List<Object> listEvents(String entityId, String startKey, short n) { + return Collections.emptyList(); + } + + @Override + public long repositoryMaxSize() { return -1; } @Override - public List<String> getAuditExcludeAttributes(String entityType) throws AtlasException { + public List<String> getAuditExcludeAttributes(String entityType) { return null; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java index 2884f8f..f9598eb 100644 --- a/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java +++ b/repository/src/main/java/org/apache/atlas/repository/converters/AtlasInstanceConverter.java @@ -20,7 +20,9 @@ package org.apache.atlas.repository.converters; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.CreateUpdateEntitiesResult; +import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.RequestContextV1; +import org.apache.atlas.model.audit.EntityAuditEventV2; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.TypeCategory; import org.apache.atlas.model.instance.AtlasClassification; @@ -54,6 +56,13 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_ADD; +import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_DELETE; +import static org.apache.atlas.EntityAuditEvent.EntityAuditAction.TAG_UPDATE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_DELETE; +import static org.apache.atlas.model.audit.EntityAuditEventV2.EntityAuditAction.CLASSIFICATION_UPDATE; +import static org.apache.atlas.v1.model.notification.EntityNotificationV2.OperationType.CLASSIFICATION_ADD; + @Singleton @Component public class AtlasInstanceConverter { @@ -290,7 +299,7 @@ public class AtlasInstanceConverter { } - private AtlasEntity.AtlasEntityWithExtInfo getAndCacheEntity(String guid) throws AtlasBaseException { + public AtlasEntity.AtlasEntityWithExtInfo getAndCacheEntity(String guid) throws AtlasBaseException { RequestContextV1 context = RequestContextV1.get(); AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo = context.getInstanceV2(guid); @@ -308,4 +317,93 @@ public class AtlasInstanceConverter { return entityWithExtInfo; } -} + + public EntityAuditEvent toV1AuditEvent(EntityAuditEventV2 v2Event) throws AtlasBaseException { + EntityAuditEvent ret = new EntityAuditEvent(); + + ret.setEntityId(v2Event.getEntityId()); + ret.setTimestamp(v2Event.getTimestamp()); + ret.setUser(v2Event.getUser()); + ret.setDetails(v2Event.getDetails()); + ret.setEventKey(v2Event.getEventKey()); + + ret.setAction(getV1AuditAction(v2Event.getAction())); + ret.setEntityDefinition(getReferenceable(v2Event.getEntityId())); + + return ret; + } + + public EntityAuditEventV2 toV2AuditEvent(EntityAuditEvent v1Event) throws AtlasBaseException { + EntityAuditEventV2 ret = new EntityAuditEventV2(); + + ret.setEntityId(v1Event.getEntityId()); + ret.setTimestamp(v1Event.getTimestamp()); + ret.setUser(v1Event.getUser()); + ret.setDetails(v1Event.getDetails()); + ret.setEventKey(v1Event.getEventKey()); + ret.setAction(getV2AuditAction(v1Event.getAction())); + + AtlasEntitiesWithExtInfo entitiesWithExtInfo = toAtlasEntity(v1Event.getEntityDefinition()); + + if (entitiesWithExtInfo != null && CollectionUtils.isNotEmpty(entitiesWithExtInfo.getEntities())) { + // there will only one source entity + AtlasEntity entity = entitiesWithExtInfo.getEntities().get(0); + + ret.setEntity(entity); + } + + return ret; + } + + private EntityAuditEvent.EntityAuditAction getV1AuditAction(EntityAuditEventV2.EntityAuditAction v2AuditAction) { + EntityAuditEvent.EntityAuditAction ret = null; + + switch (v2AuditAction) { + case ENTITY_CREATE: + case ENTITY_UPDATE: + case ENTITY_DELETE: + case ENTITY_IMPORT_CREATE: + case ENTITY_IMPORT_UPDATE: + case ENTITY_IMPORT_DELETE: + ret = EntityAuditEvent.EntityAuditAction.valueOf(v2AuditAction.name()); + break; + case CLASSIFICATION_ADD: + ret = EntityAuditEvent.EntityAuditAction.valueOf(TAG_ADD.name()); + break; + case CLASSIFICATION_DELETE: + ret = EntityAuditEvent.EntityAuditAction.valueOf(TAG_DELETE.name()); + break; + case CLASSIFICATION_UPDATE: + ret = EntityAuditEvent.EntityAuditAction.valueOf(TAG_UPDATE.name()); + break; + } + + return ret; + } + + private EntityAuditEventV2.EntityAuditAction getV2AuditAction(EntityAuditEvent.EntityAuditAction v1AuditAction) { + EntityAuditEventV2.EntityAuditAction ret = null; + + switch (v1AuditAction) { + case ENTITY_CREATE: + case ENTITY_UPDATE: + case ENTITY_DELETE: + case ENTITY_IMPORT_CREATE: + case ENTITY_IMPORT_UPDATE: + case ENTITY_IMPORT_DELETE: + ret = EntityAuditEventV2.EntityAuditAction.valueOf(v1AuditAction.name()); + break; + case TAG_ADD: + ret = EntityAuditEventV2.EntityAuditAction.valueOf(CLASSIFICATION_ADD.name()); + break; + case TAG_DELETE: + ret = EntityAuditEventV2.EntityAuditAction.valueOf(CLASSIFICATION_DELETE.name()); + break; + case TAG_UPDATE: + ret = EntityAuditEventV2.EntityAuditAction.valueOf(CLASSIFICATION_UPDATE.name()); + break; + } + + return ret; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/a3374c74/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index 31620b1..e609366 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -74,6 +74,7 @@ import static org.apache.atlas.repository.Constants.FULLTEXT_INDEX; import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.MODIFIED_BY_KEY; +import static org.apache.atlas.repository.Constants.PROPAGATED_TRAIT_NAMES_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.RELATIONSHIP_GUID_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.STATE_PROPERTY_KEY; import static org.apache.atlas.repository.Constants.SUPER_TYPES_PROPERTY_KEY; @@ -275,6 +276,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang createVertexIndex(management, ENTITY_TYPE_PROPERTY_KEY, String.class, false, SINGLE, true, true); createVertexIndex(management, SUPER_TYPES_PROPERTY_KEY, String.class, false, SET, true, true); createVertexIndex(management, TRAIT_NAMES_PROPERTY_KEY, String.class, false, SET, true, true); + createVertexIndex(management, PROPAGATED_TRAIT_NAMES_PROPERTY_KEY, String.class, false, LIST, true, true); createVertexIndex(management, TYPENAME_PROPERTY_KEY, String.class, true, SINGLE, true, true); createVertexIndex(management, VERTEX_TYPE_PROPERTY_KEY, String.class, false, SINGLE, true, true);