http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java index 53acf56..4633de9 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java @@ -21,44 +21,34 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.listener.EntityChangeListener; -import org.apache.atlas.notification.entity.EntityNotification; -import org.apache.atlas.notification.entity.EntityNotificationImpl; +import org.apache.atlas.notification.NotificationInterface.NotificationType; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.instance.Struct; +import org.apache.atlas.v1.model.notification.EntityNotificationV1; +import org.apache.atlas.v1.model.notification.EntityNotificationV1.OperationType; import org.apache.atlas.repository.graph.GraphHelper; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.ITypedReferenceableInstance; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.Struct; -import org.apache.atlas.typesystem.types.FieldMapping; -import org.apache.atlas.typesystem.types.TraitType; -import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.atlas.type.AtlasClassificationType; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.configuration.Configuration; import org.springframework.stereotype.Component; import javax.inject.Inject; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; /** * Listen to the repository for entity changes and produce entity change notifications. */ @Component public class NotificationEntityChangeListener implements EntityChangeListener { + private static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = "atlas.notification.entity"; - private final NotificationInterface notificationInterface; - private final TypeSystem typeSystem; + private final NotificationInterface notificationInterface; + private final AtlasTypeRegistry typeRegistry; + private final Map<String, List<String>> notificationAttributesCache = new HashMap<>(); - private Map<String, List<String>> notificationAttributesCache = new HashMap<>(); - private static final String ATLAS_ENTITY_NOTIFICATION_PROPERTY = "atlas.notification.entity"; - static Configuration APPLICATION_PROPERTIES = null; + private static Configuration APPLICATION_PROPERTIES = null; @@ -68,45 +58,45 @@ public class NotificationEntityChangeListener implements EntityChangeListener { * Construct a NotificationEntityChangeListener. * * @param notificationInterface the notification framework interface - * @param typeSystem the Atlas type system + * @param typeRegistry the Atlas type system */ @Inject - public NotificationEntityChangeListener(NotificationInterface notificationInterface, TypeSystem typeSystem) { + public NotificationEntityChangeListener(NotificationInterface notificationInterface, AtlasTypeRegistry typeRegistry) { this.notificationInterface = notificationInterface; - this.typeSystem = typeSystem; + this.typeRegistry = typeRegistry; } // ----- EntityChangeListener ---------------------------------------------- @Override - public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities, boolean isImport) throws AtlasException { - notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_CREATE); + public void onEntitiesAdded(Collection<Referenceable> entities, boolean isImport) throws AtlasException { + notifyOfEntityEvent(entities, OperationType.ENTITY_CREATE); } @Override - public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities, boolean isImport) throws AtlasException { - notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_UPDATE); + public void onEntitiesUpdated(Collection<Referenceable> entities, boolean isImport) throws AtlasException { + notifyOfEntityEvent(entities, OperationType.ENTITY_UPDATE); } @Override - public void onTraitsAdded(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException { - notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_ADD); + public void onTraitsAdded(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException { + notifyOfEntityEvent(Collections.singleton(entity), OperationType.TRAIT_ADD); } @Override - public void onTraitsDeleted(ITypedReferenceableInstance entity, Collection<String> traitNames) throws AtlasException { - notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_DELETE); + public void onTraitsDeleted(Referenceable entity, Collection<String> traitNames) throws AtlasException { + notifyOfEntityEvent(Collections.singleton(entity), OperationType.TRAIT_DELETE); } @Override - public void onTraitsUpdated(ITypedReferenceableInstance entity, Collection<? extends IStruct> traits) throws AtlasException { - notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_UPDATE); + public void onTraitsUpdated(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException { + notifyOfEntityEvent(Collections.singleton(entity), OperationType.TRAIT_UPDATE); } @Override - public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities, boolean isImport) throws AtlasException { - notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_DELETE); + public void onEntitiesDeleted(Collection<Referenceable> entities, boolean isImport) throws AtlasException { + notifyOfEntityEvent(entities, OperationType.ENTITY_DELETE); } @@ -115,57 +105,52 @@ public class NotificationEntityChangeListener implements EntityChangeListener { // ----- helper methods ---------------------------------------------------- @VisibleForTesting - public static List<IStruct> getAllTraits(IReferenceableInstance entityDefinition, - TypeSystem typeSystem) throws AtlasException { - List<IStruct> traitInfo = new LinkedList<>(); - for (String traitName : entityDefinition.getTraits()) { - IStruct trait = entityDefinition.getTrait(traitName); - String typeName = trait.getTypeName(); - Map<String, Object> valuesMap = trait.getValuesMap(); - traitInfo.add(new Struct(typeName, valuesMap)); - traitInfo.addAll(getSuperTraits(typeName, valuesMap, typeSystem)); - } - return traitInfo; - } + public static List<Struct> getAllTraits(Referenceable entityDefinition, AtlasTypeRegistry typeRegistry) throws AtlasException { + List<Struct> ret = new ArrayList<>(); - private static List<IStruct> getSuperTraits( - String typeName, Map<String, Object> values, TypeSystem typeSystem) throws AtlasException { + for (String traitName : entityDefinition.getTraitNames()) { + Struct trait = entityDefinition.getTrait(traitName); + AtlasClassificationType traitType = typeRegistry.getClassificationTypeByName(traitName); + Set<String> superTypeNames = traitType != null ? traitType.getAllSuperTypes() : null; - List<IStruct> superTypes = new LinkedList<>(); + ret.add(trait); - TraitType traitDef = typeSystem.getDataType(TraitType.class, typeName); - Set<String> superTypeNames = traitDef.getAllSuperTypeNames(); + if (CollectionUtils.isNotEmpty(superTypeNames)) { + for (String superTypeName : superTypeNames) { + Struct superTypeTrait = new Struct(superTypeName); - for (String superTypeName : superTypeNames) { - TraitType superTraitDef = typeSystem.getDataType(TraitType.class, superTypeName); + if (MapUtils.isNotEmpty(trait.getValues())) { + AtlasClassificationType superType = typeRegistry.getClassificationTypeByName(superTypeName); - Map<String, Object> superTypeValues = new HashMap<>(); + if (superType != null && MapUtils.isNotEmpty(superType.getAllAttributes())) { + Map<String, Object> superTypeTraitAttributes = new HashMap<>(); - FieldMapping fieldMapping = superTraitDef.fieldMapping(); + for (Map.Entry<String, Object> attrEntry : trait.getValues().entrySet()) { + String attrName = attrEntry.getKey(); - if (fieldMapping != null) { - Set<String> superTypeAttributeNames = fieldMapping.fields.keySet(); + if (superType.getAllAttributes().containsKey(attrName)) { + superTypeTraitAttributes.put(attrName, attrEntry.getValue()); + } + } - for (String superTypeAttributeName : superTypeAttributeNames) { - if (values.containsKey(superTypeAttributeName)) { - superTypeValues.put(superTypeAttributeName, values.get(superTypeAttributeName)); + superTypeTrait.setValues(superTypeTraitAttributes); + } } + + ret.add(superTypeTrait); } } - IStruct superTrait = new Struct(superTypeName, superTypeValues); - superTypes.add(superTrait); - superTypes.addAll(getSuperTraits(superTypeName, values, typeSystem)); } - return superTypes; + return ret; } // send notification of entity change - private void notifyOfEntityEvent(Collection<ITypedReferenceableInstance> entityDefinitions, - EntityNotification.OperationType operationType) throws AtlasException { - List<EntityNotification> messages = new LinkedList<>(); + private void notifyOfEntityEvent(Collection<Referenceable> entityDefinitions, + OperationType operationType) throws AtlasException { + List<EntityNotificationV1> messages = new ArrayList<>(); - for (IReferenceableInstance entityDefinition : entityDefinitions) { + for (Referenceable entityDefinition : entityDefinitions) { if(GraphHelper.isInternalType(entityDefinition.getTypeName())) { continue; } @@ -182,13 +167,13 @@ public class NotificationEntityChangeListener implements EntityChangeListener { } } - EntityNotificationImpl notification = new EntityNotificationImpl(entity, operationType, getAllTraits(entity, typeSystem)); + EntityNotificationV1 notification = new EntityNotificationV1(entity, operationType, getAllTraits(entity, typeRegistry)); messages.add(notification); } if (!messages.isEmpty()) { - notificationInterface.send(NotificationInterface.NotificationType.ENTITIES, messages); + notificationInterface.send(NotificationType.ENTITIES, messages); } }
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 4646bff..456a778 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -25,17 +25,18 @@ import org.apache.atlas.AtlasBaseClient; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; -import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContextV1; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.listener.ActiveStateChangeHandler; -import org.apache.atlas.model.instance.AtlasEntity; -import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest; -import org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest; -import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest; -import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest; -import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.notification.NotificationInterface.NotificationType; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; @@ -43,7 +44,6 @@ import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1; import org.apache.atlas.service.Service; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.web.filters.AuditFilter; import org.apache.atlas.web.service.ServiceState; @@ -57,10 +57,7 @@ import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.inject.Inject; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -77,37 +74,37 @@ import static org.apache.atlas.AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE; @Order(4) @DependsOn(value = {"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV1"}) public class NotificationHookConsumer implements Service, ActiveStateChangeHandler { - private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); - private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class); - private static final String LOCALHOST = "localhost"; - private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED"); + private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class); + private static final Logger FAILED_LOG = LoggerFactory.getLogger("FAILED"); + private static final String LOCALHOST = "localhost"; private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); - public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; - public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; + public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; + public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize"; - public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval"; - public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval"; - public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval"; - + public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval"; + public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval"; + public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; - private final AtlasEntityStore atlasEntityStore; - private final ServiceState serviceState; + + private final AtlasEntityStore atlasEntityStore; + private final ServiceState serviceState; private final AtlasInstanceConverter instanceConverter; - private final AtlasTypeRegistry typeRegistry; - private final int maxRetries; - private final int failedMsgCacheSize; + private final AtlasTypeRegistry typeRegistry; + private final int maxRetries; + private final int failedMsgCacheSize; + private final int minWaitDuration; + private final int maxWaitDuration; + + private NotificationInterface notificationInterface; + private ExecutorService executors; + private Configuration applicationProperties; @VisibleForTesting final int consumerRetryInterval; - private final int minWaitDuration; - private final int maxWaitDuration; - - private NotificationInterface notificationInterface; - private ExecutorService executors; - private Configuration applicationProperties; @VisibleForTesting List<HookConsumer> consumers; @@ -117,18 +114,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ServiceState serviceState, AtlasInstanceConverter instanceConverter, AtlasTypeRegistry typeRegistry) throws AtlasException { this.notificationInterface = notificationInterface; - this.atlasEntityStore = atlasEntityStore; - this.serviceState = serviceState; - this.instanceConverter = instanceConverter; - this.typeRegistry = typeRegistry; - + this.atlasEntityStore = atlasEntityStore; + this.serviceState = serviceState; + this.instanceConverter = instanceConverter; + this.typeRegistry = typeRegistry; this.applicationProperties = ApplicationProperties.get(); - maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); - failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20); + maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); + failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20); consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500); - minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default - maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default + minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default + maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default } @Override @@ -145,21 +141,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } if (!HAConfiguration.isHAEnabled(configuration)) { LOG.info("HA is disabled, starting consumers inline."); + startConsumers(executorService); } } private void startConsumers(ExecutorService executorService) { - int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1); - List<NotificationConsumer<HookNotificationMessage>> notificationConsumers = - notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads); + int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1); + List<NotificationConsumer<HookNotification>> notificationConsumers = notificationInterface.createConsumers(NotificationType.HOOK, numThreads); + if (executorService == null) { - executorService = Executors.newFixedThreadPool(notificationConsumers.size(), - new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build()); + executorService = Executors.newFixedThreadPool(notificationConsumers.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build()); } + executors = executorService; - for (final NotificationConsumer<HookNotificationMessage> consumer : notificationConsumers) { + + for (final NotificationConsumer<HookNotification> consumer : notificationConsumers) { HookConsumer hookConsumer = new HookConsumer(consumer); + consumers.add(hookConsumer); executors.submit(hookConsumer); } @@ -172,11 +171,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl stopConsumerThreads(); if (executors != null) { executors.shutdown(); + if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) { LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } + executors = null; } + notificationInterface.close(); } catch (InterruptedException e) { LOG.error("Failure in shutting down consumers"); @@ -190,6 +192,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl for (HookConsumer consumer : consumers) { consumer.shutdown(); } + consumers.clear(); } @@ -205,6 +208,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @Override public void instanceIsActive() { LOG.info("Reacting to active state: initializing Kafka consumers"); + startConsumers(executors); } @@ -217,6 +221,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @Override public void instanceIsPassive() { LOG.info("Reacting to passive state: shutting down Kafka consumers."); + stop(); } @@ -236,18 +241,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final long maxDuration; private final long minDuration; private final long resetInterval; + private long lastWaitAt; - private long lastWaitAt; @VisibleForTesting long waitDuration; public AdaptiveWaiter(long minDuration, long maxDuration, long increment) { - this.minDuration = minDuration; - this.maxDuration = maxDuration; - this.increment = increment; - - this.waitDuration = minDuration; - this.lastWaitAt = 0; + this.minDuration = minDuration; + this.maxDuration = maxDuration; + this.increment = increment; + this.waitDuration = minDuration; + this.lastWaitAt = 0; this.resetInterval = maxDuration * 2; } @@ -269,7 +273,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private void setWaitDurations() { long timeSinceLastWait = (lastWaitAt == 0) ? 0 : System.currentTimeMillis() - lastWaitAt; + lastWaitAt = System.currentTimeMillis(); + if (timeSinceLastWait > resetInterval) { waitDuration = minDuration; } else { @@ -283,14 +289,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @VisibleForTesting class HookConsumer extends ShutdownableThread { - private final NotificationConsumer<HookNotificationMessage> consumer; - private final AtomicBoolean shouldRun = new AtomicBoolean(false); - private List<HookNotificationMessage> failedMessages = new ArrayList<>(); - - private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); + private final NotificationConsumer<HookNotification> consumer; + private final AtomicBoolean shouldRun = new AtomicBoolean(false); + private final List<HookNotification> failedMessages = new ArrayList<>(); + private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); - public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) { + public HookConsumer(NotificationConsumer<HookNotification> consumer) { super("atlas-hook-consumer-thread", false); + this.consumer = consumer; } @@ -307,8 +313,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl try { while (shouldRun.get()) { try { - List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(); - for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) { + List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive(); + + for (AtlasKafkaMessage<HookNotification> msg : messages) { handleMessage(msg); } } catch (IllegalStateException ex) { @@ -316,6 +323,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } catch (Exception e) { if (shouldRun.get()) { LOG.warn("Exception in NotificationHookConsumer", e); + adaptiveWaiter.pause(e); } else { break; @@ -325,6 +333,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } finally { if (consumer != null) { LOG.info("closing NotificationConsumer"); + consumer.close(); } @@ -333,11 +342,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } @VisibleForTesting - void handleMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) throws AtlasServiceException, AtlasException { - AtlasPerfTracer perf = null; - - HookNotificationMessage message = kafkaMsg.getMessage(); - String messageUser = message.getUser(); + void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws AtlasServiceException, AtlasException { + AtlasPerfTracer perf = null; + HookNotification message = kafkaMsg.getMessage(); + String messageUser = message.getUser(); if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name()); @@ -345,21 +353,25 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl try { // Used for intermediate conversions during create and update - AtlasEntity.AtlasEntitiesWithExtInfo entities; + AtlasEntitiesWithExtInfo entities = null; + for (int numRetries = 0; numRetries < maxRetries; numRetries++) { if (LOG.isDebugEnabled()) { LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries); } + try { - RequestContext requestContext = RequestContext.createContext(); + RequestContextV1 requestContext = RequestContextV1.get(); + requestContext.setUser(messageUser); switch (message.getType()) { case ENTITY_CREATE: - EntityCreateRequest createRequest = (EntityCreateRequest) message; + final EntityCreateRequest createRequest = (EntityCreateRequest) message; if (numRetries == 0) { // audit only on the first attempt AtlasBaseClient.API api = AtlasClient.API_V1.CREATE_ENTITY; + audit(messageUser, api.getMethod(), api.getNormalizedPath()); } @@ -373,19 +385,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (numRetries == 0) { // audit only on the first attempt AtlasBaseClient.API api = UPDATE_ENTITY_BY_ATTRIBUTE; - audit(messageUser, api.getMethod(), - String.format(api.getNormalizedPath(), partialUpdateRequest.getTypeName())); + + audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), partialUpdateRequest.getTypeName())); } Referenceable referenceable = partialUpdateRequest.getEntity(); + entities = instanceConverter.toAtlasEntity(referenceable); AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName()); - String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() { - { - put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); - } - }); + String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, Collections.singletonMap(partialUpdateRequest.getAttribute(), (Object)partialUpdateRequest.getAttributeValue())); // There should only be one root entity entities.getEntities().get(0).setGuid(guid); @@ -398,30 +407,30 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (numRetries == 0) { // audit only on the first attempt AtlasBaseClient.API api = DELETE_ENTITY_BY_ATTRIBUTE; - audit(messageUser, api.getMethod(), - String.format(api.getNormalizedPath(), deleteRequest.getTypeName())); + + audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), deleteRequest.getTypeName())); } try { AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName()); - atlasEntityStore.deleteByUniqueAttributes(type, - new HashMap<String, Object>() {{ - put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue()); - }}); + + atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue())); } catch (ClassCastException cle) { LOG.error("Failed to do a partial update on Entity"); } break; case ENTITY_FULL_UPDATE: - EntityUpdateRequest updateRequest = (EntityUpdateRequest) message; + final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message; if (numRetries == 0) { // audit only on the first attempt AtlasBaseClient.API api = UPDATE_ENTITY; + audit(messageUser, api.getMethod(), api.getNormalizedPath()); } entities = instanceConverter.toAtlasEntities(updateRequest.getEntities()); + atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); break; @@ -434,6 +443,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl LOG.warn("Error handling message", e); try { LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); + Thread.sleep(consumerRetryInterval); } catch (InterruptedException ie) { LOG.error("Notification consumer thread sleep interrupted"); @@ -441,14 +451,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (numRetries == (maxRetries - 1)) { LOG.warn("Max retries exceeded for message {}", message, e); + failedMessages.add(message); + if (failedMessages.size() >= failedMsgCacheSize) { recordFailedMessages(); } return; } } finally { - RequestContext.clear(); RequestContextV1.clear(); } } @@ -460,15 +471,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private void recordFailedMessages() { //logging failed messages - for (HookNotificationMessage message : failedMessages) { + for (HookNotification message : failedMessages) { FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", AbstractNotification.getMessageJson(message)); } + failedMessages.clear(); } - private void commit(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) { + private void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) { recordFailedMessages(); + TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition()); + consumer.commit(partition, kafkaMessage.getOffset() + 1); } @@ -476,22 +490,23 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl try { while (serviceState.getState() != ServiceState.ServiceStateValue.ACTIVE) { try { - LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", - SERVER_READY_WAIT_TIME_MS); + LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", SERVER_READY_WAIT_TIME_MS); + timer.sleep(SERVER_READY_WAIT_TIME_MS); } catch (InterruptedException e) { - LOG.info("Interrupted while waiting for Atlas Server to become ready, " - + "exiting consumer thread.", e); + LOG.info("Interrupted while waiting for Atlas Server to become ready, " + "exiting consumer thread.", e); + return false; } } } catch (Throwable e) { - LOG.info( - "Handled AtlasServiceException while waiting for Atlas Server to become ready, " - + "exiting consumer thread.", e); + LOG.info("Handled AtlasServiceException while waiting for Atlas Server to become ready, exiting consumer thread.", e); + return false; } + LOG.info("Atlas Server is ready, can start reading Kafka events."); + return true; } @@ -506,12 +521,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } super.initiateShutdown(); + shouldRun.set(false); + if (consumer != null) { consumer.wakeup(); } super.awaitShutdown(); + LOG.info("<== HookConsumer shutdown()"); } } @@ -521,7 +539,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl LOG.debug("==> audit({},{}, {})", messageUser, method, path); } - AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST, - DateTimeHelper.formatDateUTC(new Date())); + AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST, DateTimeHelper.formatDateUTC(new Date())); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/errors/NotFoundExceptionMapper.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/errors/NotFoundExceptionMapper.java b/webapp/src/main/java/org/apache/atlas/web/errors/NotFoundExceptionMapper.java index a33d8d7..bc0b440 100644 --- a/webapp/src/main/java/org/apache/atlas/web/errors/NotFoundExceptionMapper.java +++ b/webapp/src/main/java/org/apache/atlas/web/errors/NotFoundExceptionMapper.java @@ -17,7 +17,7 @@ */ package org.apache.atlas.web.errors; -import org.apache.atlas.typesystem.exception.NotFoundException; +import org.apache.atlas.exception.NotFoundException; import org.springframework.stereotype.Component; import javax.ws.rs.core.Response; http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java index e8020db..1d553e0 100644 --- a/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java +++ b/webapp/src/main/java/org/apache/atlas/web/filters/AtlasAuthenticationFilter.java @@ -19,7 +19,7 @@ package org.apache.atlas.web.filters; import org.apache.atlas.ApplicationProperties; -import org.apache.atlas.RequestContext; +import org.apache.atlas.RequestContextV1; import org.apache.atlas.security.SecurityProperties; import org.apache.atlas.utils.AuthenticationUtil; import org.apache.atlas.web.security.AtlasAuthenticationProvider; @@ -311,7 +311,7 @@ public class AtlasAuthenticationFilter extends AuthenticationFilter { try { String requestUser = httpRequest.getRemoteUser(); NDC.push(requestUser + ":" + httpRequest.getMethod() + httpRequest.getRequestURI()); - RequestContext requestContext = RequestContext.get(); + RequestContextV1 requestContext = RequestContextV1.get(); if (requestContext != null) { requestContext.setUser(requestUser); } http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java index 191388a..3225b0e 100755 --- a/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java +++ b/webapp/src/main/java/org/apache/atlas/web/filters/AuditFilter.java @@ -20,7 +20,6 @@ package org.apache.atlas.web.filters; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; -import org.apache.atlas.RequestContext; import org.apache.atlas.RequestContextV1; import org.apache.atlas.metrics.Metrics; import org.apache.atlas.util.AtlasRepositoryConfiguration; @@ -70,7 +69,8 @@ public class AuditFilter implements Filter { try { currentThread.setName(formatName(oldName, requestId)); - RequestContext requestContext = RequestContext.createContext(); + RequestContextV1.clear(); + RequestContextV1 requestContext = RequestContextV1.get(); requestContext.setUser(user); recordAudit(httpRequest, requestTimeISO9601, user); filterChain.doFilter(request, response); @@ -79,7 +79,6 @@ public class AuditFilter implements Filter { ((HttpServletResponse) response).setHeader(AtlasClient.REQUEST_ID, requestId); currentThread.setName(oldName); recordMetrics(); - RequestContext.clear(); RequestContextV1.clear(); } } @@ -120,7 +119,7 @@ public class AuditFilter implements Filter { public static void recordMetrics() { //record metrics - Metrics requestMetrics = RequestContext.getMetrics(); + Metrics requestMetrics = RequestContextV1.getMetrics(); if (!requestMetrics.isEmpty()) { METRICS_LOG.info("{}", requestMetrics); } http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java index 435659e..5660c5b 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/DataSetLineageResource.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,13 +18,21 @@ package org.apache.atlas.web.resources; -import org.apache.atlas.AtlasClient; -import org.apache.atlas.discovery.DiscoveryException; -import org.apache.atlas.discovery.LineageService; -import org.apache.atlas.typesystem.exception.EntityNotFoundException; +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.discovery.AtlasLineageService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.lineage.AtlasLineageInfo; +import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.type.AtlasEntityType; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.atlas.v1.model.lineage.DataSetLineageResponse; +import org.apache.atlas.v1.model.lineage.SchemaResponse; +import org.apache.atlas.web.util.LineageUtils; import org.apache.atlas.web.util.Servlets; -import org.codehaus.jettison.json.JSONObject; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -32,9 +40,18 @@ import org.springframework.stereotype.Service; import javax.inject.Inject; import javax.inject.Singleton; import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.*; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.atlas.v1.model.lineage.SchemaResponse.SchemaDetails; /** * Jersey Resource for Hive Table Lineage. @@ -45,20 +62,18 @@ import javax.ws.rs.core.Response; @Deprecated public class DataSetLineageResource { - private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class); + private static final Logger LOG = LoggerFactory.getLogger(DataSetLineageResource.class); private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.DataSetLineageResource"); - private final LineageService lineageService; + private final AtlasLineageService atlasLineageService; + private final AtlasTypeRegistry typeRegistry; + private final AtlasEntityStore atlasEntityStore; - /** - * Created by the Guice ServletModule and injected with the - * configured LineageService. - * - * @param lineageService lineage service handle - */ @Inject - public DataSetLineageResource(LineageService lineageService) { - this.lineageService = lineageService; + public DataSetLineageResource(final AtlasLineageService atlasLineageService, final AtlasTypeRegistry typeRegistry, final AtlasEntityStore atlasEntityStore) { + this.atlasLineageService = atlasLineageService; + this.typeRegistry = typeRegistry; + this.atlasEntityStore = atlasEntityStore; } /** @@ -70,30 +85,28 @@ public class DataSetLineageResource { @Path("table/{tableName}/inputs/graph") @Consumes(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE) - public Response inputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) { + public DataSetLineageResponse inputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) { if (LOG.isDebugEnabled()) { LOG.debug("==> DataSetLineageResource.inputsGraph({})", tableName); } - AtlasPerfTracer perf = null; + DataSetLineageResponse ret = new DataSetLineageResponse(); + AtlasPerfTracer perf = null; try { if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DataSetLineageResource.inputsGraph(tableName=" + tableName + ")"); } - final String jsonResult = lineageService.getInputsGraph(tableName); + String guid = getGuid(tableName); - JSONObject response = new JSONObject(); - response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); - response.put("tableName", tableName); - response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); + AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.INPUT, -1); + ret.setTableName(tableName); + ret.setRequestId(Servlets.getRequestId()); + ret.setResults(LineageUtils.toLineageStruct(lineageInfo, typeRegistry)); - return Response.ok(response).build(); - } catch (EntityNotFoundException e) { - LOG.error("table entity not found for {}", tableName); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); - } catch (DiscoveryException | IllegalArgumentException e) { + return ret; + } catch (IllegalArgumentException e) { LOG.error("Unable to get lineage inputs graph for table {}", tableName, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); } catch (WebApplicationException e) { @@ -116,30 +129,28 @@ public class DataSetLineageResource { @Path("table/{tableName}/outputs/graph") @Consumes(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE) - public Response outputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) { + public DataSetLineageResponse outputsGraph(@Context HttpServletRequest request, @PathParam("tableName") String tableName) { if (LOG.isDebugEnabled()) { LOG.debug("==> DataSetLineageResource.outputsGraph({})", tableName); } - AtlasPerfTracer perf = null; + DataSetLineageResponse ret = new DataSetLineageResponse(); + AtlasPerfTracer perf = null; try { if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DataSetLineageResource.outputsGraph(tableName=" + tableName + ")"); } - final String jsonResult = lineageService.getOutputsGraph(tableName); + String guid = getGuid(tableName); - JSONObject response = new JSONObject(); - response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); - response.put("tableName", tableName); - response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); + AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.OUTPUT, -1); + ret.setTableName(tableName); + ret.setRequestId(Servlets.getRequestId()); + ret.setResults(LineageUtils.toLineageStruct(lineageInfo, typeRegistry)); - return Response.ok(response).build(); - } catch (EntityNotFoundException e) { - LOG.error("table entity not found for {}", tableName); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); - } catch (DiscoveryException | IllegalArgumentException e) { + return ret; + } catch (IllegalArgumentException e) { LOG.error("Unable to get lineage outputs graph for table {}", tableName, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); } catch (WebApplicationException e) { @@ -162,30 +173,26 @@ public class DataSetLineageResource { @Path("table/{tableName}/schema") @Consumes(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE) - public Response schema(@Context HttpServletRequest request, @PathParam("tableName") String tableName) { + public SchemaResponse schema(@Context HttpServletRequest request, @PathParam("tableName") String tableName) { if (LOG.isDebugEnabled()) { LOG.debug("==> DataSetLineageResource.schema({})", tableName); } AtlasPerfTracer perf = null; + SchemaResponse ret = new SchemaResponse(); try { if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "DataSetLineageResource.schema(tableName=" + tableName + ")"); } - final String jsonResult = lineageService.getSchema(tableName); + SchemaDetails schemaDetails = atlasLineageService.getSchemaForHiveTableByName(tableName); - JSONObject response = new JSONObject(); - response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); - response.put("tableName", tableName); - response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); - - return Response.ok(response).build(); - } catch (EntityNotFoundException e) { - LOG.error("table entity not found for {}", tableName); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); - } catch (DiscoveryException | IllegalArgumentException e) { + ret.setRequestId(Servlets.getRequestId()); + ret.setTableName(tableName); + ret.setResults(schemaDetails); + return ret; + } catch (IllegalArgumentException e) { LOG.error("Unable to get schema for table {}", tableName, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); } catch (WebApplicationException e) { @@ -198,4 +205,20 @@ public class DataSetLineageResource { AtlasPerfTracer.log(perf); } } + + private String getGuid(String tableName) throws AtlasBaseException { + if (StringUtils.isEmpty(tableName)) { + // TODO: Fix the error code if mismatch + throw new AtlasBaseException(AtlasErrorCode.BAD_REQUEST); + } + Map<String, Object> lookupAttributes = new HashMap<>(); + lookupAttributes.put("qualifiedName", tableName); + AtlasEntityType entityType = typeRegistry.getEntityTypeByName("hive_table"); + AtlasEntity.AtlasEntityWithExtInfo hive_table = atlasEntityStore.getByUniqueAttributes(entityType, lookupAttributes); + if (hive_table != null) { + return hive_table.getEntity().getGuid(); + } else { + throw new AtlasBaseException(AtlasErrorCode.INSTANCE_NOT_FOUND, tableName); + } + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java index 8b56507..11879e6 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/EntityResource.java @@ -20,39 +20,34 @@ package org.apache.atlas.web.resources; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.sun.jersey.api.core.ResourceContext; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasErrorCode; import org.apache.atlas.AtlasException; import org.apache.atlas.CreateUpdateEntitiesResult; import org.apache.atlas.EntityAuditEvent; +import org.apache.atlas.discovery.AtlasDiscoveryService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasClassification; -import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.instance.GuidMapping; import org.apache.atlas.model.legacy.EntityResult; +import org.apache.atlas.repository.audit.EntityAuditRepository; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1; -import org.apache.atlas.services.MetadataService; import org.apache.atlas.type.AtlasEntityType; import org.apache.atlas.type.AtlasType; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.ITypedReferenceableInstance; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.exception.EntityExistsException; -import org.apache.atlas.typesystem.exception.EntityNotFoundException; -import org.apache.atlas.typesystem.json.InstanceSerialization; -import org.apache.atlas.typesystem.persistence.Id; -import org.apache.atlas.typesystem.types.ValueConversionException; +import org.apache.atlas.type.AtlasTypeUtil; import org.apache.atlas.utils.AtlasPerfTracer; import org.apache.atlas.utils.ParamChecker; +import org.apache.atlas.v1.model.instance.Id; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.instance.Struct; import org.apache.atlas.web.rest.EntityREST; import org.apache.atlas.web.util.Servlets; import org.apache.commons.collections.CollectionUtils; @@ -98,32 +93,29 @@ public class EntityResource { private static final String TRAIT_NAME = "traitName"; - private final MetadataService metadataService; private final AtlasInstanceConverter restAdapters; private final AtlasEntityStore entitiesStore; private final AtlasTypeRegistry typeRegistry; - private final EntityREST entityREST; + private final EntityREST entityREST; + private final EntityAuditRepository entityAuditRepository; + private final AtlasDiscoveryService atlasDiscoveryService; @Context UriInfo uriInfo; - @Context - private ResourceContext resourceContext; - - /** - * Created by the Guice ServletModule and injected with the - * configured MetadataService. - * - * @param metadataService metadata service handle - */ @Inject - public EntityResource(MetadataService metadataService, AtlasInstanceConverter restAdapters, - AtlasEntityStore entitiesStore, AtlasTypeRegistry typeRegistry, EntityREST entityREST) { - this.metadataService = metadataService; - this.restAdapters = restAdapters; - this.entitiesStore = entitiesStore; - this.typeRegistry = typeRegistry; + public EntityResource(final AtlasInstanceConverter restAdapters, + final AtlasEntityStore entitiesStore, + final AtlasTypeRegistry typeRegistry, + final EntityREST entityREST, + final EntityAuditRepository entityAuditRepository, + final AtlasDiscoveryService atlasDiscoveryService) { + this.restAdapters = restAdapters; + this.entitiesStore = entitiesStore; + this.typeRegistry = typeRegistry; this.entityREST = entityREST; + this.entityAuditRepository = entityAuditRepository; + this.atlasDiscoveryService = atlasDiscoveryService; } /** @@ -149,22 +141,28 @@ public class EntityResource { String entities = Servlets.getRequestPayload(request); //Handle backward compatibility - if entities is not JSONArray, convert to JSONArray + JSONArray jsonEntities = null; + try { - new JSONArray(entities); + jsonEntities = new JSONArray(entities); } catch (JSONException e) { final String finalEntities = entities; - entities = new JSONArray() {{ + jsonEntities = new JSONArray() {{ put(finalEntities); - }}.toString(); + }}; } - entityJson = AtlasClient.toString(new JSONArray(entities)); + String[] jsonStrings = new String[jsonEntities.length()]; + + for (int i = 0; i < jsonEntities.length(); i++) { + jsonStrings[i] = jsonEntities.getString(i); + } if (LOG.isDebugEnabled()) { - LOG.debug("submitting entities {} ", entityJson); + LOG.debug("submitting entities {} ", jsonEntities); } - AtlasEntitiesWithExtInfo entitiesInfo = restAdapters.toAtlasEntities(entities); + AtlasEntitiesWithExtInfo entitiesInfo = restAdapters.toAtlasEntities(jsonStrings); EntityMutationResponse mutationResponse = entityREST.createOrUpdate(entitiesInfo); final List<String> guids = restAdapters.getGuids(mutationResponse.getCreatedEntities()); @@ -183,12 +181,6 @@ public class EntityResource { } catch (AtlasBaseException e) { LOG.error("Unable to persist entity instance entityDef={}", entityJson, e); throw toWebApplicationException(e); - } catch(EntityExistsException e) { - LOG.error("Unique constraint violation for entity entityDef={}", entityJson, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.CONFLICT)); - } catch (ValueConversionException ve) { - LOG.error("Unable to persist entity instance due to a deserialization error entityDef={}", entityJson, ve); - throw new WebApplicationException(Servlets.getErrorResponse(ve.getCause() != null ? ve.getCause() : ve, Response.Status.BAD_REQUEST)); } catch (AtlasException | IllegalArgumentException e) { LOG.error("Unable to persist entity instance entityDef={}", entityJson, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); @@ -224,13 +216,13 @@ public class EntityResource { return locationURI; } - private JSONObject getResponse(EntityResult entityResult) throws AtlasException, JSONException { + private JSONObject getResponse(EntityResult entityResult) throws AtlasBaseException, AtlasException, JSONException { CreateUpdateEntitiesResult result = new CreateUpdateEntitiesResult(); result.setEntityResult(entityResult); return getResponse(result); } - private JSONObject getResponse(CreateUpdateEntitiesResult result) throws AtlasException, JSONException { + private JSONObject getResponse(CreateUpdateEntitiesResult result) throws AtlasBaseException, AtlasException, JSONException { JSONObject response = new JSONObject(); EntityResult entityResult = result.getEntityResult(); GuidMapping mapping = result.getGuidMapping(); @@ -239,12 +231,12 @@ public class EntityResource { response.put(AtlasClient.ENTITIES, new JSONObject(entityResult.toString()).get(AtlasClient.ENTITIES)); String sampleEntityId = getSample(result.getEntityResult()); if (sampleEntityId != null) { - String entityDefinition = metadataService.getEntityDefinitionJson(sampleEntityId); + String entityDefinition = getEntityJson(sampleEntityId); response.put(AtlasClient.DEFINITION, new JSONObject(entityDefinition)); } } if(mapping != null) { - response.put(AtlasClient.GUID_ASSIGNMENTS, new JSONObject(AtlasType.toJson(mapping)).get(AtlasClient.GUID_ASSIGNMENTS)); + response.put(AtlasClient.GUID_ASSIGNMENTS, new JSONObject(AtlasType.toV1Json(mapping)).get(AtlasClient.GUID_ASSIGNMENTS)); } return response; } @@ -270,14 +262,18 @@ public class EntityResource { } final String entities = Servlets.getRequestPayload(request); + JSONArray jsonEntities = new JSONArray(entities); + String[] jsonStrings = new String[jsonEntities.length()]; - entityJson = AtlasClient.toString(new JSONArray(entities)); + for (int i = 0; i < jsonEntities.length(); i++) { + jsonStrings[i] = jsonEntities.getString(i); + } if (LOG.isDebugEnabled()) { LOG.info("updating entities {} ", entityJson); } - AtlasEntitiesWithExtInfo entitiesInfo = restAdapters.toAtlasEntities(entities); + AtlasEntitiesWithExtInfo entitiesInfo = restAdapters.toAtlasEntities(jsonStrings); EntityMutationResponse mutationResponse = entityREST.createOrUpdate(entitiesInfo); CreateUpdateEntitiesResult result = restAdapters.toCreateUpdateEntitiesResult(mutationResponse); @@ -290,12 +286,6 @@ public class EntityResource { } catch (AtlasBaseException e) { LOG.error("Unable to persist entity instance entityDef={}", entityJson, e); throw toWebApplicationException(e); - } catch(EntityExistsException e) { - LOG.error("Unique constraint violation for entityDef={}", entityJson, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.CONFLICT)); - } catch (ValueConversionException ve) { - LOG.error("Unable to persist entity instance due to a deserialization error entityDef={}", entityJson, ve); - throw new WebApplicationException(Servlets.getErrorResponse(ve.getCause(), Response.Status.BAD_REQUEST)); } catch (AtlasException | IllegalArgumentException e) { LOG.error("Unable to persist entity instance entityDef={}", entityJson, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); @@ -367,7 +357,7 @@ public class EntityResource { LOG.debug("Partially updating entity by unique attribute {} {} {} {} ", entityType, attribute, value, entityJson); } - Referenceable updatedEntity = InstanceSerialization.fromJsonReferenceable(entityJson, true); + Referenceable updatedEntity = AtlasType.fromV1Json(entityJson, Referenceable.class); entityType = ParamChecker.notEmpty(entityType, "Entity type cannot be null"); attribute = ParamChecker.notEmpty(attribute, "attribute name cannot be null"); @@ -379,10 +369,10 @@ public class EntityResource { // update referenceable with Id if not specified in payload Id updateId = updatedEntity.getId(); - if (updateId != null && !updateId.isAssigned()) { + if (updateId != null && !AtlasTypeUtil.isAssignedGuid(updateId.getId())) { String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(getEntityType(entityType), attributes); - updatedEntity.replaceWithNewId(new Id(guid, 0, updatedEntity.getTypeName())); + updatedEntity.setId(new Id(guid, 0, updatedEntity.getTypeName())); } AtlasEntitiesWithExtInfo entitiesInfo = restAdapters.toAtlasEntity(updatedEntity); @@ -398,15 +388,6 @@ public class EntityResource { } catch (AtlasBaseException e) { LOG.error("Unable to partially update entity {} {}:{}.{}", entityJson, entityType, attribute, value, e); throw toWebApplicationException(e); - } catch (ValueConversionException ve) { - LOG.error("Unable to persist entity instance due to a deserialization error {} ", entityJson, ve); - throw new WebApplicationException(Servlets.getErrorResponse(ve.getCause(), Response.Status.BAD_REQUEST)); - } catch(EntityExistsException e) { - LOG.error("Unique constraint violation for entity {} ", entityJson, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.CONFLICT)); - } catch (EntityNotFoundException e) { - LOG.error("An entity with type={} and qualifiedName={} does not exist {} ", entityType, value, entityJson, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); } catch (AtlasException | IllegalArgumentException e) { LOG.error("Unable to partially update entity {} {}:{}.{}", entityJson, entityType, attribute, value, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); @@ -474,13 +455,13 @@ public class EntityResource { LOG.debug("partially updating entity for guid {} : {} ", guid, entityJson); } - Referenceable updatedEntity = InstanceSerialization.fromJsonReferenceable(entityJson, true); + Referenceable updatedEntity = AtlasType.fromV1Json(entityJson, Referenceable.class); // update referenceable with Id if not specified in payload Id updateId = updatedEntity.getId(); - if (updateId != null && !updateId.isAssigned()) { - updatedEntity.replaceWithNewId(new Id(guid, 0, updatedEntity.getTypeName())); + if (updateId != null && !AtlasTypeUtil.isAssignedGuid(updateId.getId())) { + updatedEntity.setId(new Id(guid, 0, updatedEntity.getTypeName())); } AtlasEntitiesWithExtInfo entitiesInfo = restAdapters.toAtlasEntity(updatedEntity); @@ -496,9 +477,6 @@ public class EntityResource { } catch (AtlasBaseException e) { LOG.error("Unable to update entity by GUID {} {} ", guid, entityJson, e); throw toWebApplicationException(e); - } catch (EntityNotFoundException e) { - LOG.error("An entity with GUID={} does not exist {} ", guid, entityJson, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); } catch (AtlasException | IllegalArgumentException e) { LOG.error("Unable to update entity by GUID {} {}", guid, entityJson, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); @@ -544,9 +522,6 @@ public class EntityResource { } catch (AtlasBaseException e) { LOG.error("Unable to add property {} to entity id {} {} ", property, guid, value, e); throw toWebApplicationException(e); - } catch (EntityNotFoundException e) { - LOG.error("An entity with GUID={} does not exist {} ", guid, value, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); } catch (AtlasException | IllegalArgumentException e) { LOG.error("Unable to add property {} to entity id {} {} ", property, guid, value, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); @@ -617,13 +592,6 @@ public class EntityResource { } catch (AtlasBaseException e) { LOG.error("Unable to delete entities {} {} {} {} ", guids, entityType, attribute, value, e); throw toWebApplicationException(e); - } catch (EntityNotFoundException e) { - if(guids != null && !guids.isEmpty()) { - LOG.error("An entity with GUID={} does not exist ", guids, e); - } else { - LOG.error("An entity with qualifiedName {}-{}-{} does not exist", entityType, attribute, value, e); - } - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); } catch (AtlasException | IllegalArgumentException e) { LOG.error("Unable to delete entities {} {} {} {} ", guids, entityType, attribute, value, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); @@ -666,7 +634,8 @@ public class EntityResource { } guid = ParamChecker.notEmpty(guid, "guid cannot be null"); - final String entityDefinition = metadataService.getEntityDefinitionJson(guid); + + String entityDefinition = getEntityJson(guid); JSONObject response = new JSONObject(); response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); @@ -681,11 +650,7 @@ public class EntityResource { } return Response.status(status).entity(response).build(); - - } catch (EntityNotFoundException e) { - LOG.error("An entity with GUID={} does not exist ", guid, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); - } catch (AtlasException | IllegalArgumentException e) { + } catch (IllegalArgumentException e) { LOG.error("Bad GUID={} ", guid, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); } catch (WebApplicationException e) { @@ -716,19 +681,19 @@ public class EntityResource { LOG.debug("Fetching entity list for type={} ", entityType); } - final List<String> entityList = metadataService.getEntityList(entityType); + List<String> entityGUIDS = entitiesStore.getEntityGUIDS(entityType); JSONObject response = new JSONObject(); response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); response.put(AtlasClient.TYPENAME, entityType); - response.put(AtlasClient.RESULTS, new JSONArray(entityList)); - response.put(AtlasClient.COUNT, entityList.size()); + response.put(AtlasClient.RESULTS, new JSONArray(entityGUIDS)); + response.put(AtlasClient.COUNT, entityGUIDS.size()); return Response.ok(response).build(); } catch (NullPointerException e) { LOG.error("Entity type cannot be null", e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); - } catch (AtlasException | IllegalArgumentException e) { + } catch (IllegalArgumentException e) { LOG.error("Unable to get entity list for type {}", entityType, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); } catch (WebApplicationException e) { @@ -804,10 +769,9 @@ public class EntityResource { String entityDefinition = null; if (entityInfo != null) { - AtlasEntity entity = entityInfo.getEntity(); - final ITypedReferenceableInstance instance = restAdapters.getITypedReferenceable(entity); + Referenceable instance = restAdapters.getReferenceable(entityInfo); - entityDefinition = InstanceSerialization.toJson(instance, true); + entityDefinition = AtlasType.toV1Json(instance); } JSONObject response = new JSONObject(); @@ -926,8 +890,8 @@ public class EntityResource { JSONArray traits = new JSONArray(); for (AtlasClassification classification : classifications) { - IStruct trait = restAdapters.getTrait(classification); - traits.put(new JSONObject(InstanceSerialization.toJson(trait, true))); + Struct trait = restAdapters.getTrait(classification); + traits.put(new JSONObject(AtlasType.toV1Json(trait))); } JSONObject response = new JSONObject(); @@ -984,11 +948,11 @@ public class EntityResource { final AtlasClassification classification = entitiesStore.getClassification(guid, traitName); - IStruct traitDefinition = restAdapters.getTrait(classification); + Struct traitDefinition = restAdapters.getTrait(classification); JSONObject response = new JSONObject(); response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); - response.put(AtlasClient.RESULTS, new JSONObject(InstanceSerialization.toJson(traitDefinition, true))); + response.put(AtlasClient.RESULTS, new JSONObject(AtlasType.toV1Json(traitDefinition))); return Response.ok(response).build(); @@ -1044,7 +1008,7 @@ public class EntityResource { add(guid); }}; - entitiesStore.addClassification(guids, restAdapters.getClassification(InstanceSerialization.fromJsonStruct(traitDefinition, true))); + entitiesStore.addClassification(guids, restAdapters.toAtlasClassification(AtlasType.fromV1Json(traitDefinition, Struct.class))); URI locationURI = getLocationURI(new ArrayList<String>() {{ add(guid); @@ -1160,13 +1124,13 @@ public class EntityResource { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "EntityResource.getAuditEvents(" + guid + ", " + startKey + ", " + count + ")"); } - List<EntityAuditEvent> events = metadataService.getAuditEvents(guid, startKey, count); + List<EntityAuditEvent> events = entityAuditRepository.listEvents(guid, startKey, count); JSONObject response = new JSONObject(); response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); response.put(AtlasClient.EVENTS, getJSONArray(events)); return Response.ok(response).build(); - } catch (AtlasException | IllegalArgumentException e) { + } catch (IllegalArgumentException e) { LOG.error("Unable to get audit events for entity guid={} startKey={}", guid, startKey, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); } catch (WebApplicationException e) { @@ -1216,4 +1180,12 @@ public class EntityResource { return new WebApplicationException(Servlets.getErrorResponse(e, e.getAtlasErrorCode().getHttpCode())); } + + private String getEntityJson(String guid) throws AtlasBaseException { + AtlasEntityWithExtInfo entity = entitiesStore.getById(guid); + Referenceable referenceable = restAdapters.getReferenceable(entity); + String entityJson = AtlasType.toV1Json(referenceable); + + return entityJson; + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java index cba8ccf..891e4d7 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/LineageResource.java @@ -18,21 +18,16 @@ package org.apache.atlas.web.resources; -import org.apache.atlas.AtlasClient; import org.apache.atlas.discovery.AtlasLineageService; -import org.apache.atlas.discovery.DiscoveryException; -import org.apache.atlas.discovery.LineageService; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.lineage.AtlasLineageInfo; import org.apache.atlas.model.lineage.AtlasLineageInfo.LineageDirection; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.typesystem.exception.EntityNotFoundException; -import org.apache.atlas.typesystem.exception.SchemaNotFoundException; import org.apache.atlas.utils.AtlasPerfTracer; +import org.apache.atlas.v1.model.lineage.LineageResponse; +import org.apache.atlas.v1.model.lineage.SchemaResponse; import org.apache.atlas.web.util.LineageUtils; import org.apache.atlas.web.util.Servlets; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -56,18 +51,15 @@ public class LineageResource { private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger("rest.LineageResource"); private final AtlasLineageService atlasLineageService; - private final LineageService lineageService; private final AtlasTypeRegistry typeRegistry; /** * Created by the Guice ServletModule and injected with the * configured LineageService. * - * @param lineageService lineage service handle */ @Inject - public LineageResource(LineageService lineageService, AtlasLineageService atlasLineageService, AtlasTypeRegistry typeRegistry) { - this.lineageService = lineageService; + public LineageResource(AtlasLineageService atlasLineageService, AtlasTypeRegistry typeRegistry) { this.atlasLineageService = atlasLineageService; this.typeRegistry = typeRegistry; } @@ -81,11 +73,13 @@ public class LineageResource { @Path("{guid}/inputs/graph") @Consumes(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE) - public Response inputsGraph(@PathParam("guid") String guid) { + public LineageResponse inputsGraph(@PathParam("guid") String guid) { if (LOG.isDebugEnabled()) { LOG.debug("==> LineageResource.inputsGraph({})", guid); } + LineageResponse ret = new LineageResponse(); + AtlasPerfTracer perf = null; try { if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { @@ -93,22 +87,16 @@ public class LineageResource { } AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.INPUT, -1); - final String result = LineageUtils.toLineageStruct(lineageInfo, typeRegistry); - - JSONObject response = new JSONObject(); - response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); - response.put(AtlasClient.RESULTS, new JSONObject(result)); + ret.setRequestId(Servlets.getRequestId()); + ret.setResults(LineageUtils.toLineageStruct(lineageInfo, typeRegistry)); - return Response.ok(response).build(); + return ret; } catch (AtlasBaseException e) { LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e); throw new WebApplicationException(Servlets.getErrorResponse(e)); } catch (WebApplicationException e) { LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e); throw e; - } catch (JSONException e) { - LOG.error("Unable to get lineage inputs graph for entity guid={}", guid, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); } finally { AtlasPerfTracer.log(perf); @@ -127,11 +115,13 @@ public class LineageResource { @Path("{guid}/outputs/graph") @Consumes(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE) - public Response outputsGraph(@PathParam("guid") String guid) { + public LineageResponse outputsGraph(@PathParam("guid") String guid) { if (LOG.isDebugEnabled()) { LOG.debug("==> LineageResource.outputsGraph({})", guid); } + LineageResponse ret = new LineageResponse(); + AtlasPerfTracer perf = null; try { @@ -140,22 +130,16 @@ public class LineageResource { } AtlasLineageInfo lineageInfo = atlasLineageService.getAtlasLineageInfo(guid, LineageDirection.OUTPUT, -1); - final String result = LineageUtils.toLineageStruct(lineageInfo, typeRegistry); + ret.setRequestId(Servlets.getRequestId()); + ret.setResults(LineageUtils.toLineageStruct(lineageInfo, typeRegistry)); - JSONObject response = new JSONObject(); - response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); - response.put(AtlasClient.RESULTS, new JSONObject(result)); - - return Response.ok(response).build(); + return ret; } catch (AtlasBaseException e) { LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e); throw new WebApplicationException(Servlets.getErrorResponse(e)); } catch (WebApplicationException e) { LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e); throw e; - } catch (JSONException e) { - LOG.error("Unable to get lineage outputs graph for entity guid={}", guid, e); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.INTERNAL_SERVER_ERROR)); } finally { AtlasPerfTracer.log(perf); @@ -174,31 +158,26 @@ public class LineageResource { @Path("{guid}/schema") @Consumes(Servlets.JSON_MEDIA_TYPE) @Produces(Servlets.JSON_MEDIA_TYPE) - public Response schema(@PathParam("guid") String guid) { + public SchemaResponse schema(@PathParam("guid") String guid) { if (LOG.isDebugEnabled()) { LOG.debug("==> LineageResource.schema({})", guid); } AtlasPerfTracer perf = null; + SchemaResponse ret = new SchemaResponse(); + try { if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, "LineageResource.schema(" + guid + ")"); } - final String jsonResult = lineageService.getSchemaForEntity(guid); + SchemaResponse.SchemaDetails schemaDetails = atlasLineageService.getSchemaForHiveTableByGuid(guid); - JSONObject response = new JSONObject(); - response.put(AtlasClient.REQUEST_ID, Servlets.getRequestId()); - response.put(AtlasClient.RESULTS, new JSONObject(jsonResult)); - return Response.ok(response).build(); - } catch (SchemaNotFoundException e) { - LOG.error("schema not found for {}", guid); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); - } catch (EntityNotFoundException e) { - LOG.error("table entity not found for {}", guid); - throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.NOT_FOUND)); - } catch (DiscoveryException | IllegalArgumentException e) { + ret.setRequestId(Servlets.getRequestId()); + ret.setResults(schemaDetails); + return ret; + } catch (IllegalArgumentException e) { LOG.error("Unable to get schema for entity guid={}", guid, e); throw new WebApplicationException(Servlets.getErrorResponse(e, Response.Status.BAD_REQUEST)); } catch (WebApplicationException e) {