Repository: ranger Updated Branches: refs/heads/master 49b8a7a26 -> 9ed210470
RANGER-1997: Update tagsync to handle Atlas notifications of type V1 and V2 Project: http://git-wip-us.apache.org/repos/asf/ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/ranger/commit/9ed21047 Tree: http://git-wip-us.apache.org/repos/asf/ranger/tree/9ed21047 Diff: http://git-wip-us.apache.org/repos/asf/ranger/diff/9ed21047 Branch: refs/heads/master Commit: 9ed210470bbc447d1fbacd9b98421309394061b0 Parents: 49b8a7a Author: Abhay Kulkarni <akulka...@hortonworks.com> Authored: Wed Feb 28 12:59:58 2018 -0800 Committer: Abhay Kulkarni <akulka...@hortonworks.com> Committed: Wed Feb 28 12:59:58 2018 -0800 ---------------------------------------------------------------------- .../source/atlas/AtlasNotificationMapper.java | 30 +-- .../tagsync/source/atlas/AtlasTagSource.java | 44 ++-- .../source/atlas/EntityNotificationWrapper.java | 242 +++++++++++++++++++ .../atlasrest/RangerAtlasEntityWithTags.java | 31 +-- 4 files changed, 283 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ranger/blob/9ed21047/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java index 91cf606..916aad3 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java @@ -19,9 +19,6 @@ package org.apache.ranger.tagsync.source.atlas; -import org.apache.atlas.v1.model.notification.EntityNotificationV1; -import org.apache.atlas.v1.model.instance.Id; -import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; @@ -46,13 +43,12 @@ public class AtlasNotificationMapper { private static Map<String, Long> unhandledEventTypes = new HashMap<String, Long>(); - private static void logUnhandledEntityNotification(EntityNotificationV1 entityNotification) { + private static void logUnhandledEntityNotification(EntityNotificationWrapper entityNotification) { final int REPORTING_INTERVAL_FOR_UNHANDLED_ENTITYTYPE_IN_MILLIS = 5 * 60 * 1000; // 5 minutes boolean loggingNeeded = false; - String entityTypeName = entityNotification != null && entityNotification.getEntity() != null ? - entityNotification.getEntity().getTypeName() : null; + String entityTypeName = entityNotification.getEntityTypeName(); if (entityTypeName != null) { Long timeInMillis = unhandledEventTypes.get(entityTypeName); @@ -76,7 +72,7 @@ public class AtlasNotificationMapper { } @SuppressWarnings("unchecked") - public static ServiceTags processEntityNotification(EntityNotificationV1 entityNotification) { + public static ServiceTags processEntityNotification(EntityNotificationWrapper entityNotification) { ServiceTags ret = null; @@ -84,7 +80,7 @@ public class AtlasNotificationMapper { try { RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(entityNotification); - if (entityNotification.getOperationType() == EntityNotificationV1.OperationType.ENTITY_DELETE) { + if (entityNotification.getIsEntityDeleteOp()) { ret = buildServiceTagsForEntityDeleteNotification(entityWithTags); } else { ret = buildServiceTags(entityWithTags, null); @@ -111,21 +107,21 @@ public class AtlasNotificationMapper { return ret; } - static private boolean isNotificationHandled(EntityNotificationV1 entityNotification) { + static private boolean isNotificationHandled(EntityNotificationWrapper entityNotification) { boolean ret = false; - EntityNotificationV1.OperationType opType = entityNotification.getOperationType(); + EntityNotificationWrapper.NotificationType opType = entityNotification.getEntityNotificationType(); if (opType != null) { switch (opType) { case ENTITY_CREATE: - ret = CollectionUtils.isNotEmpty(entityNotification.getAllTraits()); + ret = ! entityNotification.getIsEmptyClassifications(); break; case ENTITY_UPDATE: case ENTITY_DELETE: - case TRAIT_ADD: - case TRAIT_UPDATE: - case TRAIT_DELETE: { + case CLASSIFICATION_ADD: + case CLASSIFICATION_UPDATE: + case CLASSIFICATION_DELETE: { ret = true; break; } @@ -134,11 +130,7 @@ public class AtlasNotificationMapper { break; } if (ret) { - final Referenceable entity = entityNotification.getEntity(); - - ret = entity != null - && entity.getId().getState() == Id.EntityState.ACTIVE - && AtlasResourceMapperUtil.isEntityTypeHandled(entity.getTypeName()); + ret = entityNotification.getIsEntityTypeHandled(); } } http://git-wip-us.apache.org/repos/asf/ranger/blob/9ed21047/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java index 8c15ee5..a13a789 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java @@ -21,9 +21,9 @@ package org.apache.ranger.tagsync.source.atlas; import org.apache.atlas.kafka.NotificationProvider; +import org.apache.atlas.model.notification.EntityNotification; import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.v1.model.notification.EntityNotificationV1; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -101,7 +101,7 @@ public class AtlasTagSource extends AbstractTagSource { if (ret) { NotificationInterface notification = NotificationProvider.get(); - List<NotificationConsumer<EntityNotificationV1>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); + List<NotificationConsumer<EntityNotification>> iterators = notification.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1); consumerTask = new ConsumerRunnable(iterators.get(0)); @@ -138,10 +138,10 @@ public class AtlasTagSource extends AbstractTagSource { } } - private static String getPrintableEntityNotification(EntityNotificationV1 notification) { + private static String getPrintableEntityNotification(EntityNotificationWrapper notification) { StringBuilder sb = new StringBuilder(); - sb.append("{ Notification-Type: ").append(notification.getOperationType()).append(", "); + sb.append("{ Notification-Type: ").append(notification.getEntityNotificationType()).append(", "); RangerAtlasEntityWithTags entityWithTags = new RangerAtlasEntityWithTags(notification); sb.append(entityWithTags.toString()); @@ -151,9 +151,9 @@ public class AtlasTagSource extends AbstractTagSource { private class ConsumerRunnable implements Runnable { - private final NotificationConsumer<EntityNotificationV1> consumer; + private final NotificationConsumer<EntityNotification> consumer; - private ConsumerRunnable(NotificationConsumer<EntityNotificationV1> consumer) { + private ConsumerRunnable(NotificationConsumer<EntityNotification> consumer) { this.consumer = consumer; } @@ -165,23 +165,31 @@ public class AtlasTagSource extends AbstractTagSource { } while (true) { try { - List<AtlasKafkaMessage<EntityNotificationV1>> messages = consumer.receive(1000L); + List<AtlasKafkaMessage<EntityNotification>> messages = consumer.receive(1000L); - for (AtlasKafkaMessage<EntityNotificationV1> message : messages) { - EntityNotificationV1 notification = message != null ? message.getMessage() : null; + for (AtlasKafkaMessage<EntityNotification> message : messages) { + EntityNotification notification = message != null ? message.getMessage() : null; if (notification != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Notification=" + getPrintableEntityNotification(notification)); + EntityNotificationWrapper notificationWrapper = null; + try { + notificationWrapper = new EntityNotificationWrapper(notification); + } catch (Throwable e) { + LOG.error("notification:[" + notification +"] has some issues..perhaps null entity??", e); } - - ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notification); - if (serviceTags != null) { - updateSink(serviceTags); + if (notificationWrapper != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Notification=" + getPrintableEntityNotification(notificationWrapper)); + } + + ServiceTags serviceTags = AtlasNotificationMapper.processEntityNotification(notificationWrapper); + if (serviceTags != null) { + updateSink(serviceTags); + } + + TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); + consumer.commit(partition, message.getOffset()); } - - TopicPartition partition = new TopicPartition("ATLAS_ENTITIES", message.getPartition()); - consumer.commit(partition, message.getOffset()); } else { LOG.error("Null entityNotification received from Kafka!! Ignoring.."); } http://git-wip-us.apache.org/repos/asf/ranger/blob/9ed21047/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java new file mode 100644 index 0000000..e680b14 --- /dev/null +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/EntityNotificationWrapper.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.tagsync.source.atlas; + +import org.apache.atlas.model.instance.AtlasClassification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.notification.EntityNotification; +import org.apache.atlas.v1.model.instance.Id; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.instance.Struct; +import org.apache.atlas.v1.model.notification.EntityNotificationV1; +import org.apache.atlas.v1.model.notification.EntityNotificationV2; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.tagsync.source.atlasrest.RangerAtlasEntity; + +import javax.annotation.Nonnull; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class EntityNotificationWrapper { + private static final Log LOG = LogFactory.getLog(EntityNotificationWrapper.class); + + public enum NotificationType { UNKNOWN, ENTITY_CREATE, ENTITY_UPDATE, ENTITY_DELETE, CLASSIFICATION_ADD, CLASSIFICATION_UPDATE, CLASSIFICATION_DELETE} + + private final EntityNotification notification; + private final EntityNotification.EntityNotificationType notificationType; + private final RangerAtlasEntity rangerAtlasEntity; + private final String entityTypeName; + private final boolean isEntityTypeHandled; + private final boolean isEntityDeleteOp; + private final boolean isEmptyClassifications; + + EntityNotificationWrapper(@Nonnull EntityNotification notification) { + this.notification = notification; + notificationType = this.notification.getType(); + + switch (notificationType) { + case ENTITY_NOTIFICATION_V2: { + + EntityNotificationV2 v2Notification = (EntityNotificationV2) notification; + AtlasEntity atlasEntity = v2Notification.getEntity(); + String guid = atlasEntity.getGuid(); + String typeName = atlasEntity.getTypeName(); + + rangerAtlasEntity = new RangerAtlasEntity(typeName, guid, atlasEntity.getAttributes()); + entityTypeName = atlasEntity.getTypeName(); + isEntityTypeHandled = atlasEntity.getStatus() == AtlasEntity.Status.ACTIVE + && AtlasResourceMapperUtil.isEntityTypeHandled(entityTypeName); + isEntityDeleteOp = EntityNotificationV2.OperationType.ENTITY_DELETE == v2Notification.getOperationType(); + isEmptyClassifications = CollectionUtils.isNotEmpty(v2Notification.getClassifications()); + } + break; + case ENTITY_NOTIFICATION_V1: { + EntityNotificationV1 v1Notification = (EntityNotificationV1) notification; + + Referenceable atlasEntity = v1Notification.getEntity(); + String guid = atlasEntity.getId()._getId(); + String typeName = atlasEntity.getTypeName(); + + rangerAtlasEntity = new RangerAtlasEntity(typeName, guid, atlasEntity.getValues()); + entityTypeName = atlasEntity.getTypeName(); + isEntityTypeHandled = atlasEntity.getId().getState() == Id.EntityState.ACTIVE + && AtlasResourceMapperUtil.isEntityTypeHandled(entityTypeName); + isEntityDeleteOp = EntityNotificationV1.OperationType.ENTITY_DELETE == v1Notification.getOperationType(); + isEmptyClassifications = CollectionUtils.isNotEmpty(v1Notification.getAllTraits()); + } + break; + default: { + LOG.error("Unknown notification type - [" + notificationType + "]"); + + rangerAtlasEntity = null; + entityTypeName = null; + isEntityTypeHandled = false; + isEntityDeleteOp = false; + isEmptyClassifications = true; + } + + break; + } + } + + public RangerAtlasEntity getRangerAtlasEntity() { + return rangerAtlasEntity; + } + + public String getEntityTypeName() { + return entityTypeName; + } + + public boolean getIsEntityTypeHandled() { + return isEntityTypeHandled; + } + + public boolean getIsEntityDeleteOp() { + return isEntityDeleteOp; + } + + public boolean getIsEmptyClassifications() { + return isEmptyClassifications; + } + + public NotificationType getEntityNotificationType() { + NotificationType ret = NotificationType.UNKNOWN; + + switch (notificationType) { + case ENTITY_NOTIFICATION_V2: { + EntityNotificationV2.OperationType opType = ((EntityNotificationV2) notification).getOperationType(); + switch (opType) { + case ENTITY_CREATE: + ret = NotificationType.ENTITY_CREATE; + break; + case ENTITY_UPDATE: + ret = NotificationType.ENTITY_UPDATE; + break; + case ENTITY_DELETE: + ret = NotificationType.ENTITY_DELETE; + break; + case CLASSIFICATION_ADD: + ret = NotificationType.CLASSIFICATION_ADD; + break; + case CLASSIFICATION_UPDATE: + ret = NotificationType.CLASSIFICATION_UPDATE; + break; + case CLASSIFICATION_DELETE: + ret = NotificationType.CLASSIFICATION_DELETE; + break; + default: + LOG.error("Received OperationType [" + opType + "], converting to UNKNOWN"); + break; + } + break; + } + case ENTITY_NOTIFICATION_V1: { + EntityNotificationV1.OperationType opType = ((EntityNotificationV1) notification).getOperationType(); + switch (opType) { + case ENTITY_CREATE: + ret = NotificationType.ENTITY_CREATE; + break; + case ENTITY_UPDATE: + ret = NotificationType.ENTITY_UPDATE; + break; + case ENTITY_DELETE: + ret = NotificationType.ENTITY_DELETE; + break; + case TRAIT_ADD: + ret = NotificationType.CLASSIFICATION_ADD; + break; + case TRAIT_UPDATE: + ret = NotificationType.CLASSIFICATION_UPDATE; + break; + case TRAIT_DELETE: + ret = NotificationType.CLASSIFICATION_DELETE; + break; + default: + LOG.error("Received OperationType [" + opType + "], converting to UNKNOWN"); + break; + } + break; + } + default: { + LOG.error("Unknown notification type - [" + notificationType + "]"); + } + break; + } + + return ret; + } + + public Map<String, Map<String, String>> getAllClassifications() { + Map<String, Map<String, String>> ret = new HashMap<>(); + + switch (notificationType) { + case ENTITY_NOTIFICATION_V2: { + List<AtlasClassification> allClassifications = ((EntityNotificationV2) notification).getClassifications(); + if (CollectionUtils.isNotEmpty(allClassifications)) { + for (AtlasClassification classification : allClassifications) { + String classificationName = classification.getTypeName(); + + Map<String, Object> valuesMap = classification.getAttributes(); + Map<String, String> attributes = new HashMap<>(); + if (valuesMap != null) { + for (Map.Entry<String, Object> value : valuesMap.entrySet()) { + if (value.getValue() != null) { + attributes.put(value.getKey(), value.getValue().toString()); + } + } + } + ret.put(classificationName, attributes); + } + } + } + break; + case ENTITY_NOTIFICATION_V1: { + List<Struct> allTraits = ((EntityNotificationV1) notification).getAllTraits(); + if (CollectionUtils.isNotEmpty(allTraits)) { + for (Struct trait : allTraits) { + String traitName = trait.getTypeName(); + + Map<String, Object> valuesMap = trait.getValuesMap(); + Map<String, String> attributes = new HashMap<>(); + if (valuesMap != null) { + for (Map.Entry<String, Object> value : valuesMap.entrySet()) { + if (value.getValue() != null) { + attributes.put(value.getKey(), value.getValue().toString()); + } + } + } + ret.put(traitName, attributes); + } + } + } + break; + default: { + LOG.error("Unknown notification type - [" + notificationType + "]"); + } + break; + } + + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/ranger/blob/9ed21047/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java ---------------------------------------------------------------------- diff --git a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java index b25a241..ecbc502 100644 --- a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java +++ b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlasrest/RangerAtlasEntityWithTags.java @@ -21,13 +21,9 @@ package org.apache.ranger.tagsync.source.atlasrest; import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasStructType; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.v1.model.notification.EntityNotificationV1; -import org.apache.atlas.v1.model.instance.Referenceable; -import org.apache.atlas.v1.model.instance.Struct; import org.apache.commons.lang.StringUtils; +import org.apache.ranger.tagsync.source.atlas.EntityNotificationWrapper; -import java.util.HashMap; -import java.util.List; import java.util.Map; public class RangerAtlasEntityWithTags { @@ -35,31 +31,12 @@ public class RangerAtlasEntityWithTags { private final Map<String, Map<String, String>> tags; private final AtlasTypeRegistry typeRegistry; - public RangerAtlasEntityWithTags(EntityNotificationV1 notification ) { - Referenceable atlasEntity = notification.getEntity(); + public RangerAtlasEntityWithTags(EntityNotificationWrapper notification ) { - String guid = atlasEntity.getId()._getId(); - String typeName = atlasEntity.getTypeName(); + this.entity = notification.getRangerAtlasEntity(); - this.entity = new RangerAtlasEntity(typeName, guid, atlasEntity.getValues()); + this.tags = notification.getAllClassifications(); - this.tags = new HashMap<>(); - - List<Struct> allTraits = notification.getAllTraits(); - for (Struct trait : allTraits) { - String traitName = trait.getTypeName(); - - Map<String, Object> valuesMap = trait.getValuesMap(); - Map<String, String> attributes = new HashMap<>(); - if (valuesMap != null) { - for (Map.Entry<String, Object> value : valuesMap.entrySet()) { - if (value.getValue() != null) { - attributes.put(value.getKey(), value.getValue().toString()); - } - } - } - this.tags.put(traitName, attributes); - } this.typeRegistry = null; }