http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index 3bc4fba..bf6a36c 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -21,22 +21,20 @@ package org.apache.atlas.hook; import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.kafka.NotificationProvider; +import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.v1.model.notification.HookNotification; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.atlas.security.InMemoryJAASConfiguration; -import org.apache.atlas.type.AtlasType; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.security.UserGroupInformation; -import org.codehaus.jettison.json.JSONArray; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; @@ -102,9 +100,9 @@ public abstract class AtlasHook { protected abstract String getNumberOfRetriesPropertyKey(); protected void notifyEntities(String user, List<Referenceable> entities) { - List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); - hookNotificationMessages.add(new HookNotification.EntityCreateRequest(user, entities)); - notifyEntities(hookNotificationMessages); + List<HookNotification> hookNotifications = new ArrayList<>(); + hookNotifications.add(new EntityCreateRequest(user, entities)); + notifyEntities(hookNotifications); } /** @@ -116,12 +114,12 @@ public abstract class AtlasHook { * @param messages hook notification messages * @param maxRetries maximum number of retries while sending message to messaging system */ - public static void notifyEntities(List<HookNotification.HookNotificationMessage> messages, int maxRetries) { + public static void notifyEntities(List<HookNotification> messages, int maxRetries) { notifyEntitiesInternal(messages, maxRetries, notificationInterface, logFailedMessages, failedMessagesLogger); } @VisibleForTesting - static void notifyEntitiesInternal(List<HookNotification.HookNotificationMessage> messages, int maxRetries, + static void notifyEntitiesInternal(List<HookNotification> messages, int maxRetries, NotificationInterface notificationInterface, boolean shouldLogFailedMessages, FailedMessagesLogger logger) { if (messages == null || messages.isEmpty()) { @@ -168,7 +166,7 @@ public abstract class AtlasHook { * * @param messages hook notification messages */ - protected void notifyEntities(List<HookNotification.HookNotificationMessage> messages) { + protected void notifyEntities(List<HookNotification> messages) { final int maxRetries = atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3); notifyEntities(messages, maxRetries); }
http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java index 975967d..6caf7e2 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java @@ -17,16 +17,9 @@ */ package org.apache.atlas.notification; -import com.google.gson.reflect.TypeToken; -import org.apache.atlas.model.notification.AtlasNotificationMessage; import org.apache.atlas.notification.entity.EntityMessageDeserializer; import org.apache.atlas.notification.hook.HookMessageDeserializer; -import org.apache.atlas.v1.model.notification.EntityNotification; -import org.apache.atlas.v1.model.notification.HookNotification; -import org.codehaus.jackson.type.TypeReference; -import scala.reflect.internal.Types; -import java.lang.reflect.Type; import java.util.List; /** @@ -46,19 +39,6 @@ public interface NotificationInterface { String PROPERTY_PREFIX = "atlas.notification"; /** - * Notification message class types. - */ - Class<HookNotification.HookNotificationMessage> HOOK_NOTIFICATION_CLASS = - HookNotification.HookNotificationMessage.class; - - Class<EntityNotification> ENTITY_NOTIFICATION_CLASS = EntityNotification.class; - - /** - * Versioned notification message class types. - */ - Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<AtlasNotificationMessage<EntityNotification>>(){}.getType(); - - /** * Atlas notification types. */ enum NotificationType { http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java index f1e1992..fa160cf 100644 --- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java @@ -19,9 +19,9 @@ package org.apache.atlas.notification.entity; import org.apache.atlas.model.notification.AtlasNotificationMessage; +import org.apache.atlas.model.notification.EntityNotification; import org.apache.atlas.notification.AbstractMessageDeserializer; import org.apache.atlas.notification.AbstractNotification; -import org.apache.atlas.v1.model.notification.EntityNotification; import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java index 6dff821..cab442f 100644 --- a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java @@ -19,9 +19,9 @@ package org.apache.atlas.notification.hook; import org.apache.atlas.model.notification.AtlasNotificationMessage; +import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.notification.AbstractMessageDeserializer; import org.apache.atlas.notification.AbstractNotification; -import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage; import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +31,7 @@ import org.slf4j.LoggerFactory; /** * Hook notification message deserializer. */ -public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNotificationMessage> { +public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNotification> { /** * Logger for hook notification messages. @@ -45,14 +45,14 @@ public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNot * Create a hook notification message deserializer. */ public HookMessageDeserializer() { - super(new TypeReference<HookNotificationMessage>() {}, - new TypeReference<AtlasNotificationMessage<HookNotificationMessage>>() {}, + super(new TypeReference<HookNotification>() {}, + new TypeReference<AtlasNotificationMessage<HookNotification>>() {}, AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER); } @Override - public HookNotificationMessage deserialize(String messageJson) { - final HookNotificationMessage ret = super.deserialize(messageJson); + public HookNotification deserialize(String messageJson) { + final HookNotification ret = super.deserialize(messageJson); if (ret != null) { ret.normalize(); http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java index 9ce2a50..0a0620f 100644 --- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java +++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java @@ -18,9 +18,10 @@ package org.apache.atlas.hook; +import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.v1.model.notification.HookNotification; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; @@ -51,41 +52,41 @@ public class AtlasHookTest { @Test (timeOut = 10000) public void testNotifyEntitiesDoesNotHangOnException() throws Exception { - List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); + List<HookNotification> hookNotifications = new ArrayList<>(); doThrow(new NotificationException(new Exception())).when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 0, notificationInterface, false, + .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + AtlasHook.notifyEntitiesInternal(hookNotifications, 0, notificationInterface, false, failedMessagesLogger); // if we've reached here, the method finished OK. } @Test public void testNotifyEntitiesRetriesOnException() throws NotificationException { - List<HookNotification.HookNotificationMessage> hookNotificationMessages = - new ArrayList<HookNotification.HookNotificationMessage>() {{ - add(new HookNotification.EntityCreateRequest("user")); + List<HookNotification> hookNotifications = + new ArrayList<HookNotification>() {{ + add(new EntityCreateRequest("user")); } }; doThrow(new NotificationException(new Exception())).when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, false, + .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, false, failedMessagesLogger); verify(notificationInterface, times(2)). - send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); + send(NotificationInterface.NotificationType.HOOK, hookNotifications); } @Test public void testFailedMessageIsLoggedIfRequired() throws NotificationException { - List<HookNotification.HookNotificationMessage> hookNotificationMessages = - new ArrayList<HookNotification.HookNotificationMessage>() {{ - add(new HookNotification.EntityCreateRequest("user")); + List<HookNotification> hookNotifications = + new ArrayList<HookNotification>() {{ + add(new EntityCreateRequest("user")); } }; doThrow(new NotificationException(new Exception(), Arrays.asList("test message"))) .when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true, + .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, true, failedMessagesLogger); verify(failedMessagesLogger, times(1)).log("test message"); @@ -93,11 +94,11 @@ public class AtlasHookTest { @Test public void testFailedMessageIsNotLoggedIfNotRequired() throws NotificationException { - List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); + List<HookNotification> hookNotifications = new ArrayList<>(); doThrow(new NotificationException(new Exception(), Arrays.asList("test message"))) .when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, false, + .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, false, failedMessagesLogger); verifyZeroInteractions(failedMessagesLogger); @@ -105,15 +106,15 @@ public class AtlasHookTest { @Test public void testAllFailedMessagesAreLogged() throws NotificationException { - List<HookNotification.HookNotificationMessage> hookNotificationMessages = - new ArrayList<HookNotification.HookNotificationMessage>() {{ - add(new HookNotification.EntityCreateRequest("user")); + List<HookNotification> hookNotifications = + new ArrayList<HookNotification>() {{ + add(new EntityCreateRequest("user")); } }; doThrow(new NotificationException(new Exception(), Arrays.asList("test message1", "test message2"))) .when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true, + .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, true, failedMessagesLogger); verify(failedMessagesLogger, times(1)).log("test message1"); @@ -122,10 +123,10 @@ public class AtlasHookTest { @Test public void testFailedMessageIsNotLoggedIfNotANotificationException() throws Exception { - List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); + List<HookNotification> hookNotifications = new ArrayList<>(); doThrow(new RuntimeException("test message")).when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true, + .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, true, failedMessagesLogger); verifyZeroInteractions(failedMessagesLogger); http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java index f1fc741..2e8abd7 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java @@ -18,13 +18,14 @@ package org.apache.atlas.kafka; -import kafka.message.MessageAndMetadata; +import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; -import org.apache.atlas.notification.*; +import org.apache.atlas.notification.IncompatibleVersionException; +import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.model.notification.AtlasNotificationMessage; import org.apache.atlas.notification.entity.EntityNotificationTest; -import org.apache.atlas.v1.model.notification.HookNotification; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest; import org.apache.atlas.type.AtlasType; import org.apache.atlas.model.notification.MessageVersion; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -32,21 +33,15 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.codehaus.jettison.json.JSONException; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.Collections; -import java.util.LinkedList; import java.util.List; -import java.util.Arrays; -import java.util.ArrayList; -import java.util.HashMap; import java.util.Map; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -56,7 +51,6 @@ import static org.testng.Assert.*; * KafkaConsumer tests. */ public class KafkaConsumerTest { - private static final String TRAIT_NAME = "MyTrait"; @@ -71,88 +65,62 @@ public class KafkaConsumerTest { @Test public void testReceive() throws Exception { - - - MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class); - - Referenceable entity = getEntity(TRAIT_NAME); - - HookNotification.EntityUpdateRequest message = - new HookNotification.EntityUpdateRequest("user1", entity); - - String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message)); - - kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0))); - List<ConsumerRecord> klist = new ArrayList<>(); - klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK", - 0, 0L, "mykey", json)); - - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - Map mp = new HashMap(); - mp.put(tp,klist); - ConsumerRecords records = new ConsumerRecords(mp); - + Referenceable entity = getEntity(TRAIT_NAME); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message)); + TopicPartition tp = new TopicPartition("ATLAS_HOOK", 0); + List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>("ATLAS_HOOK", 0, 0L, "mykey", json)); + Map mp = Collections.singletonMap(tp, klist); + ConsumerRecords records = new ConsumerRecords(mp); when(kafkaConsumer.poll(100)).thenReturn(records); - when(messageAndMetadata.message()).thenReturn(json); + kafkaConsumer.assign(Collections.singletonList(tp)); + + AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L); + List<AtlasKafkaMessage<HookNotification>> messageList = consumer.receive(); - AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, false, 100L); - List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(); assertTrue(messageList.size() > 0); - HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage(); + HookNotification consumedMessage = messageList.get(0).getMessage(); assertMessagesEqual(message, consumedMessage, entity); - } @Test public void testNextVersionMismatch() throws Exception { + Referenceable entity = getEntity(TRAIT_NAME); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message)); + TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); + List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>("ATLAS_HOOK", 0, 0L, "mykey", json)); + Map mp = Collections.singletonMap(tp,klist); + ConsumerRecords records = new ConsumerRecords(mp); - MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class); - - Referenceable entity = getEntity(TRAIT_NAME); - - HookNotification.EntityUpdateRequest message = - new HookNotification.EntityUpdateRequest("user1", entity); - - String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message)); - - kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0))); - List<ConsumerRecord> klist = new ArrayList<>(); - klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK", - 0, 0L, "mykey", json)); - - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - Map mp = new HashMap(); - mp.put(tp,klist); - ConsumerRecords records = new ConsumerRecords(mp); + kafkaConsumer.assign(Collections.singletonList(tp)); when(kafkaConsumer.poll(100L)).thenReturn(records); - when(messageAndMetadata.message()).thenReturn(json); - AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer ,false, 100L); + AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer ,false, 100L); + try { - List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(); + List<AtlasKafkaMessage<HookNotification>> messageList = consumer.receive(); + assertTrue(messageList.size() > 0); - HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage(); + HookNotification consumedMessage = messageList.get(0).getMessage(); fail("Expected VersionMismatchException!"); } catch (IncompatibleVersionException e) { e.printStackTrace(); } - } @Test public void testCommitIsCalledIfAutoCommitDisabled() { - - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - - AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, false, 100L); + TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); + AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L); consumer.commit(tp, 1); @@ -161,10 +129,8 @@ public class KafkaConsumerTest { @Test public void testCommitIsNotCalledIfAutoCommitEnabled() { - - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - - AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, true , 100L); + TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); + AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, true , 100L); consumer.commit(tp, 1); @@ -172,26 +138,21 @@ public class KafkaConsumerTest { } private Referenceable getEntity(String traitName) { - Referenceable entity = EntityNotificationTest.getEntity("id"); - List<Struct> traitInfo = new LinkedList<>(); - Struct trait = new Struct(traitName, Collections.<String, Object>emptyMap()); - traitInfo.add(trait); - return entity; + return EntityNotificationTest.getEntity("id", new Struct(traitName, Collections.<String, Object>emptyMap())); } - private void assertMessagesEqual(HookNotification.EntityUpdateRequest message, - HookNotification.HookNotificationMessage consumedMessage, - Referenceable entity) throws JSONException { - + private void assertMessagesEqual(EntityUpdateRequest message, + HookNotification consumedMessage, + Referenceable entity) { assertEquals(consumedMessage.getType(), message.getType()); assertEquals(consumedMessage.getUser(), message.getUser()); - assertTrue(consumedMessage instanceof HookNotification.EntityUpdateRequest); + assertTrue(consumedMessage instanceof EntityUpdateRequest); - HookNotification.EntityUpdateRequest deserializedEntityUpdateRequest = - (HookNotification.EntityUpdateRequest) consumedMessage; + EntityUpdateRequest deserializedEntityUpdateRequest = (EntityUpdateRequest) consumedMessage; Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0); + assertEquals(deserializedEntity.getId(), entity.getId()); assertEquals(deserializedEntity.getTypeName(), entity.getTypeName()); assertEquals(deserializedEntity.getTraits(), entity.getTraits()); http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java index fe019e1..e0655f3 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java @@ -22,25 +22,25 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.v1.model.notification.HookNotification; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.RandomStringUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage; +import org.apache.atlas.model.notification.HookNotification; import java.util.List; import static org.testng.Assert.assertEquals; public class KafkaNotificationTest { - private KafkaNotification kafkaNotification; @BeforeClass public void setup() throws Exception { Configuration properties = ApplicationProperties.get(); + properties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5)); kafkaNotification = new KafkaNotification(properties); @@ -55,29 +55,27 @@ public class KafkaNotificationTest { @Test public void testReceiveKafkaMessages() throws Exception { - kafkaNotification.send(NotificationInterface.NotificationType.HOOK, - new HookNotification.EntityCreateRequest("u1", new Referenceable("type"))); - kafkaNotification.send(NotificationInterface.NotificationType.HOOK, - new HookNotification.EntityCreateRequest("u2", new Referenceable("type"))); - kafkaNotification.send(NotificationInterface.NotificationType.HOOK, - new HookNotification.EntityCreateRequest("u3", new Referenceable("type"))); - kafkaNotification.send(NotificationInterface.NotificationType.HOOK, - new HookNotification.EntityCreateRequest("u4", new Referenceable("type"))); - - NotificationConsumer<Object> consumer = - kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0); - List<AtlasKafkaMessage<Object>> messages = null ; - long startTime = System.currentTimeMillis(); //fetch starting time + kafkaNotification.send(NotificationInterface.NotificationType.HOOK, new EntityCreateRequest("u1", new Referenceable("type"))); + kafkaNotification.send(NotificationInterface.NotificationType.HOOK, new EntityCreateRequest("u2", new Referenceable("type"))); + kafkaNotification.send(NotificationInterface.NotificationType.HOOK, new EntityCreateRequest("u3", new Referenceable("type"))); + kafkaNotification.send(NotificationInterface.NotificationType.HOOK, new EntityCreateRequest("u4", new Referenceable("type"))); + + NotificationConsumer<Object> consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0); + List<AtlasKafkaMessage<Object>> messages = null ; + long startTime = System.currentTimeMillis(); //fetch starting time + while ((System.currentTimeMillis() - startTime) < 10000) { messages = consumer.receive(); + if (messages.size() > 0) { break; } } - int i=1; + int i = 1; for (AtlasKafkaMessage<Object> msg : messages){ - HookNotification.HookNotificationMessage message = (HookNotificationMessage) msg.getMessage(); + HookNotification message = (HookNotification) msg.getMessage(); + assertEquals(message.getUser(), "u"+i++); } http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java index 98d7d2c..94cb70d 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java @@ -19,13 +19,14 @@ package org.apache.atlas.notification; import org.apache.atlas.AtlasException; +import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.model.notification.HookNotification.HookNotificationType; +import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.type.AtlasType; -import org.apache.atlas.v1.model.notification.HookNotification; import org.apache.commons.configuration.Configuration; -import org.testng.annotations.Test; import java.util.ArrayList; -import java.util.LinkedList; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -37,62 +38,56 @@ import static org.testng.Assert.*; */ public class AbstractNotificationTest { - @Test + @org.testng.annotations.Test public void testSend() throws Exception { - Configuration configuration = mock(Configuration.class); + Configuration configuration = mock(Configuration.class); + TestNotification notification = new TestNotification(configuration); + Test message1 = new Test(HookNotificationType.ENTITY_CREATE, "user1"); + Test message2 = new Test(HookNotificationType.TYPE_CREATE, "user1"); + Test message3 = new Test(HookNotificationType.ENTITY_FULL_UPDATE, "user1"); + List<String> messageJson = new ArrayList<>(); - TestNotification notification = new TestNotification(configuration); - - TestMessage message1 = new TestMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "user1"); - TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1"); - TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1"); - - List<String> messageJson = new ArrayList<>(); AbstractNotification.createNotificationMessages(message1, messageJson); AbstractNotification.createNotificationMessages(message2, messageJson); AbstractNotification.createNotificationMessages(message3, messageJson); - notification.send(NotificationInterface.NotificationType.HOOK, message1, message2, message3); + notification.send(NotificationType.HOOK, message1, message2, message3); - assertEquals(NotificationInterface.NotificationType.HOOK, notification.type); + assertEquals(NotificationType.HOOK, notification.type); assertEquals(3, notification.messages.size()); + for (int i = 0; i < notification.messages.size(); i++) { assertEqualsMessageJson(notification.messages.get(i), messageJson.get(i)); } } - @Test + @org.testng.annotations.Test public void testSend2() throws Exception { - Configuration configuration = mock(Configuration.class); - - TestNotification notification = new TestNotification(configuration); + Configuration configuration = mock(Configuration.class); + TestNotification notification = new TestNotification(configuration); + Test message1 = new Test(HookNotificationType.ENTITY_CREATE, "user1"); + Test message2 = new Test(HookNotificationType.TYPE_CREATE, "user1"); + Test message3 = new Test(HookNotificationType.ENTITY_FULL_UPDATE, "user1"); + List<Test> messages = Arrays.asList(message1, message2, message3); + List<String> messageJson = new ArrayList<>(); - TestMessage message1 = new TestMessage(HookNotification.HookNotificationType.ENTITY_CREATE, "user1"); - TestMessage message2 = new TestMessage(HookNotification.HookNotificationType.TYPE_CREATE, "user1"); - TestMessage message3 = new TestMessage(HookNotification.HookNotificationType.ENTITY_FULL_UPDATE, "user1"); - - List<TestMessage> messages = new LinkedList<>(); - messages.add(message1); - messages.add(message2); - messages.add(message3); - - List<String> messageJson = new ArrayList<>(); AbstractNotification.createNotificationMessages(message1, messageJson); AbstractNotification.createNotificationMessages(message2, messageJson); AbstractNotification.createNotificationMessages(message3, messageJson); notification.send(NotificationInterface.NotificationType.HOOK, messages); - assertEquals(notification.type, NotificationInterface.NotificationType.HOOK); + assertEquals(notification.type, NotificationType.HOOK); assertEquals(notification.messages.size(), messageJson.size()); + for (int i = 0; i < notification.messages.size(); i++) { assertEqualsMessageJson(notification.messages.get(i), messageJson.get(i)); } } - public static class TestMessage extends HookNotification.HookNotificationMessage { + public static class Test extends HookNotification { - public TestMessage(HookNotification.HookNotificationType type, String user) { + public Test(HookNotificationType type, String user) { super(type, user); } } @@ -120,7 +115,7 @@ public class AbstractNotificationTest { protected void sendInternal(NotificationType notificationType, List<String> notificationMessages) throws NotificationException { - type = notificationType; + type = notificationType; messages = notificationMessages; } http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java deleted file mode 100644 index ddb63b5..0000000 --- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.notification.entity; - -import org.apache.atlas.v1.model.instance.Referenceable; -import org.apache.atlas.v1.model.instance.Struct; -import org.apache.atlas.notification.AbstractNotification; -import org.apache.atlas.v1.model.notification.EntityNotification; -import org.testng.annotations.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; - -import static org.testng.Assert.assertEquals; - -/** - * EntityMessageDeserializer tests. - */ -public class EntityMessageDeserializerTest { - private EntityMessageDeserializer deserializer = new EntityMessageDeserializer(); - - @Test - public void testDeserialize() throws Exception { - Referenceable entity = EntityNotificationTest.getEntity("id"); - String traitName = "MyTrait"; - List<Struct> traitInfo = new LinkedList<>(); - Struct trait = new Struct(traitName, Collections.<String, Object>emptyMap()); - traitInfo.add(trait); - - EntityNotification notification = - new EntityNotification(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo); - - List<String> jsonMsgList = new ArrayList<>(); - - AbstractNotification.createNotificationMessages(notification, jsonMsgList); - - EntityNotification deserializedNotification = null; - - for (String jsonMsg : jsonMsgList) { - deserializedNotification = deserializer.deserialize(jsonMsg); - - if (deserializedNotification != null) { - break; - } - } - - assertEquals(deserializedNotification.getOperationType(), notification.getOperationType()); - assertEquals(deserializedNotification.getEntity().getId(), notification.getEntity().getId()); - assertEquals(deserializedNotification.getEntity().getTypeName(), notification.getEntity().getTypeName()); - assertEquals(deserializedNotification.getEntity().getTraits(), notification.getEntity().getTraits()); - assertEquals(deserializedNotification.getEntity().getTrait(traitName), - notification.getEntity().getTrait(traitName)); - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java new file mode 100644 index 0000000..13eafb6 --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationDeserializerTest.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification.entity; + +import org.apache.atlas.model.notification.EntityNotification; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.instance.Struct; +import org.apache.atlas.notification.AbstractNotification; +import org.apache.atlas.v1.model.notification.EntityNotificationV1; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +/** + * EntityMessageDeserializer tests. + */ +public class EntityNotificationDeserializerTest { + private EntityMessageDeserializer deserializer = new EntityMessageDeserializer(); + + @Test + public void testDeserialize() throws Exception { + Referenceable entity = EntityNotificationTest.getEntity("id"); + String traitName = "MyTrait"; + List<Struct> traits = Collections.singletonList(new Struct(traitName, Collections.<String, Object>emptyMap())); + EntityNotificationV1 notification = new EntityNotificationV1(entity, EntityNotificationV1.OperationType.TRAIT_ADD, traits); + List<String> jsonMsgList = new ArrayList<>(); + + AbstractNotification.createNotificationMessages(notification, jsonMsgList); + + EntityNotification deserializedNotification = null; + + for (String jsonMsg : jsonMsgList) { + deserializedNotification = deserializer.deserialize(jsonMsg); + + if (deserializedNotification != null) { + break; + } + } + + assertTrue(deserializedNotification instanceof EntityNotificationV1); + + EntityNotificationV1 entityNotificationV1 = (EntityNotificationV1)deserializedNotification; + + assertEquals(entityNotificationV1.getOperationType(), notification.getOperationType()); + assertEquals(entityNotificationV1.getEntity().getId(), notification.getEntity().getId()); + assertEquals(entityNotificationV1.getEntity().getTypeName(), notification.getEntity().getTypeName()); + assertEquals(entityNotificationV1.getEntity().getTraits(), notification.getEntity().getTraits()); + assertEquals(entityNotificationV1.getEntity().getTrait(traitName), notification.getEntity().getTrait(traitName)); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationTest.java index cedfc01..232b21d 100644 --- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityNotificationTest.java @@ -22,7 +22,8 @@ import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.v1.model.notification.EntityNotification; +import org.apache.atlas.v1.model.notification.EntityNotificationV1; +import org.apache.atlas.v1.model.notification.EntityNotificationV1.OperationType; import org.testng.annotations.Test; import java.util.Collections; @@ -38,62 +39,48 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; /** - * EntityNotification tests. + * EntityNotificationV1 tests. */ public class EntityNotificationTest { @Test public void testGetEntity() throws Exception { - Referenceable entity = getEntity("id"); - - EntityNotification entityNotification = - new EntityNotification(entity, EntityNotification.OperationType.ENTITY_CREATE, - Collections.<Struct>emptyList()); + Referenceable entity = getEntity("id"); + EntityNotificationV1 entityNotification = new EntityNotificationV1(entity, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList()); assertEquals(entity, entityNotification.getEntity()); } @Test public void testGetOperationType() throws Exception { - Referenceable entity = getEntity("id"); - - EntityNotification entityNotification = - new EntityNotification(entity, EntityNotification.OperationType.ENTITY_CREATE, - Collections.<Struct>emptyList()); + Referenceable entity = getEntity("id"); + EntityNotificationV1 entityNotification = new EntityNotificationV1(entity, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList()); - assertEquals(EntityNotification.OperationType.ENTITY_CREATE, entityNotification.getOperationType()); + assertEquals(EntityNotificationV1.OperationType.ENTITY_CREATE, entityNotification.getOperationType()); } @Test public void testGetAllTraits() throws Exception { - Referenceable entity = getEntity("id"); - String traitName = "MyTrait"; - List<Struct> traitInfo = new LinkedList<>(); - Struct trait = new Struct(traitName, Collections.<String, Object>emptyMap()); - traitInfo.add(trait); + Referenceable entity = getEntity("id"); + String traitName = "MyTrait"; + List<Struct> traitInfo = Collections.singletonList(new Struct(traitName, Collections.<String, Object>emptyMap())); - EntityNotification entityNotification = - new EntityNotification(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo); + EntityNotificationV1 entityNotification = new EntityNotificationV1(entity, OperationType.TRAIT_ADD, traitInfo); assertEquals(traitInfo, entityNotification.getAllTraits()); } @Test public void testGetAllTraitsSuperTraits() throws Exception { - AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class); - - String traitName = "MyTrait"; - Struct myTrait = new Struct(traitName); - - String superTraitName = "MySuperTrait"; - - AtlasClassificationType traitType = mock(AtlasClassificationType.class); - Set<String> superTypeNames = Collections.singleton(superTraitName); - - AtlasClassificationType superTraitType = mock(AtlasClassificationType.class); - Set<String> superSuperTypeNames = Collections.emptySet(); - - Referenceable entity = getEntity("id", myTrait); + AtlasTypeRegistry typeRegistry = mock(AtlasTypeRegistry.class); + String traitName = "MyTrait"; + Struct myTrait = new Struct(traitName); + String superTraitName = "MySuperTrait"; + AtlasClassificationType traitType = mock(AtlasClassificationType.class); + Set<String> superTypeNames = Collections.singleton(superTraitName); + AtlasClassificationType superTraitType = mock(AtlasClassificationType.class); + Set<String> superSuperTypeNames = Collections.emptySet(); + Referenceable entity = getEntity("id", myTrait); when(typeRegistry.getClassificationTypeByName(traitName)).thenReturn(traitType); when(typeRegistry.getClassificationTypeByName(superTraitName)).thenReturn(superTraitType); @@ -101,8 +88,7 @@ public class EntityNotificationTest { when(traitType.getAllSuperTypes()).thenReturn(superTypeNames); when(superTraitType.getAllSuperTypes()).thenReturn(superSuperTypeNames); - EntityNotification entityNotification = - new EntityNotification(entity, EntityNotification.OperationType.TRAIT_ADD, typeRegistry); + EntityNotificationV1 entityNotification = new EntityNotificationV1(entity, OperationType.TRAIT_ADD, typeRegistry); List<Struct> allTraits = entityNotification.getAllTraits(); @@ -110,32 +96,25 @@ public class EntityNotificationTest { for (Struct trait : allTraits) { String typeName = trait.getTypeName(); + assertTrue(typeName.equals(traitName) || typeName.equals(superTraitName)); } } @Test public void testEquals() throws Exception { - Referenceable entity = getEntity("id"); - - EntityNotification entityNotification2 = - new EntityNotification(entity, EntityNotification.OperationType.ENTITY_CREATE, - Collections.<Struct>emptyList()); - - EntityNotification entityNotification = - new EntityNotification(entity, EntityNotification.OperationType.ENTITY_CREATE, - Collections.<Struct>emptyList()); + Referenceable entity = getEntity("id"); + EntityNotificationV1 entityNotification2 = new EntityNotificationV1(entity, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList()); + EntityNotificationV1 entityNotification = new EntityNotificationV1(entity, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList()); assertTrue(entityNotification.equals(entityNotification2)); assertTrue(entityNotification2.equals(entityNotification)); } public static Referenceable getEntity(String id, Struct... traits) { - String typeName = "typeName"; - Map<String, Object> values = new HashMap<>(); - - List<String> traitNames = new LinkedList<>(); - Map<String, Struct> traitMap = new HashMap<>(); + String typeName = "typeName"; + List<String> traitNames = new LinkedList<>(); + Map<String, Struct> traitMap = new HashMap<>(); for (Struct trait : traits) { String traitName = trait.getTypeName(); @@ -143,6 +122,7 @@ public class EntityNotificationTest { traitNames.add(traitName); traitMap.put(traitName, trait); } - return new Referenceable(id, typeName, values, traitNames, traitMap); + + return new Referenceable(id, typeName, new HashMap<String, Object>(), traitNames, traitMap); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java deleted file mode 100644 index 17facf8..0000000 --- a/notification/src/test/java/org/apache/atlas/notification/hook/HookMessageDeserializerTest.java +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.notification.hook; - -import org.apache.atlas.notification.entity.EntityNotificationTest; -import org.apache.atlas.v1.model.instance.Referenceable; -import org.apache.atlas.v1.model.instance.Struct; -import org.apache.atlas.notification.AbstractNotification; -import org.apache.atlas.v1.model.notification.HookNotification.EntityUpdateRequest; -import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage; -import org.apache.atlas.type.AtlasType; -import org.apache.commons.lang3.RandomStringUtils; -import org.testng.annotations.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; - -/** - * HookMessageDeserializer tests. - */ -public class HookMessageDeserializerTest { - private HookMessageDeserializer deserializer = new HookMessageDeserializer(); - - @Test - public void testDeserialize() throws Exception { - Referenceable entity = generateEntityWithTrait(); - EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); - - List<String> jsonMsgList = new ArrayList<>(); - - AbstractNotification.createNotificationMessages(message, jsonMsgList); - - HookNotificationMessage deserializedMessage = deserialize(jsonMsgList); - - assertEqualMessage(deserializedMessage, message); - } - - // validate deserialization of legacy message, which doesn't use MessageVersion - @Test - public void testDeserializeLegacyMessage() throws Exception { - Referenceable entity = generateEntityWithTrait(); - EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); - - String jsonMsg = AtlasType.toV1Json(message); - HookNotificationMessage deserializedMessage = deserialize(Collections.singletonList(jsonMsg)); - - assertEqualMessage(deserializedMessage, message); - } - - @Test - public void testDeserializeCompressedMessage() throws Exception { - Referenceable entity = generateLargeEntityWithTrait(); - EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); - - List<String> jsonMsgList = new ArrayList<>(); - - AbstractNotification.createNotificationMessages(message, jsonMsgList); - - assertTrue(jsonMsgList.size() == 1); - - String compressedMsg = jsonMsgList.get(0); - String uncompressedMsg = AtlasType.toV1Json(message); - - assertTrue(compressedMsg.length() < uncompressedMsg.length(), "Compressed message (" + compressedMsg.length() + ") should be shorter than uncompressed message (" + uncompressedMsg.length() + ")"); - - HookNotificationMessage deserializedMessage = deserialize(jsonMsgList); - - assertEqualMessage(deserializedMessage, message); - } - - @Test - public void testDeserializeSplitMessage() throws Exception { - Referenceable entity = generateVeryLargeEntityWithTrait(); - EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); - - List<String> jsonMsgList = new ArrayList<>(); - - AbstractNotification.createNotificationMessages(message, jsonMsgList); - - assertTrue(jsonMsgList.size() > 1); - - HookNotificationMessage deserializedMessage = deserialize(jsonMsgList); - - assertEqualMessage(deserializedMessage, message); - } - - private Referenceable generateEntityWithTrait() { - Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap())); - - return ret; - } - - private HookNotificationMessage deserialize(List<String> jsonMsgList) { - HookNotificationMessage deserializedMessage = null; - - for (String jsonMsg : jsonMsgList) { - deserializedMessage = deserializer.deserialize(jsonMsg); - - if (deserializedMessage != null) { - break; - } - } - - return deserializedMessage; - } - - private void assertEqualMessage(HookNotificationMessage deserializedMessage, EntityUpdateRequest message) throws Exception { - assertNotNull(deserializedMessage); - assertEquals(deserializedMessage.getType(), message.getType()); - assertEquals(deserializedMessage.getUser(), message.getUser()); - - assertTrue(deserializedMessage instanceof EntityUpdateRequest); - - EntityUpdateRequest deserializedEntityUpdateRequest = (EntityUpdateRequest) deserializedMessage; - Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0); - Referenceable entity = message.getEntities().get(0); - String traitName = entity.getTraitNames().get(0); - - assertEquals(deserializedEntity.getId(), entity.getId()); - assertEquals(deserializedEntity.getTypeName(), entity.getTypeName()); - assertEquals(deserializedEntity.getTraits(), entity.getTraits()); - assertEquals(deserializedEntity.getTrait(traitName).hashCode(), entity.getTrait(traitName).hashCode()); - - } - - private Referenceable generateLargeEntityWithTrait() { - Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap())); - - // add 100 attributes, each with value of size 10k - // Json Size=1,027,984; GZipped Size=16,387 ==> will compress, but not split - String attrValue = RandomStringUtils.randomAlphanumeric(10 * 1024); // use the same value for all attributes - to aid better compression - for (int i = 0; i < 100; i++) { - ret.set("attr_" + i, attrValue); - } - - return ret; - } - - private Referenceable generateVeryLargeEntityWithTrait() { - Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap())); - - // add 300 attributes, each with value of size 10k - // Json Size=3,082,384; GZipped Size=2,313,357 ==> will compress & split - for (int i = 0; i < 300; i++) { - ret.set("attr_" + i, RandomStringUtils.randomAlphanumeric(10 * 1024)); - } - - return ret; - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java new file mode 100644 index 0000000..d048170 --- /dev/null +++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationDeserializerTest.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification.hook; + +import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.notification.entity.EntityNotificationTest; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.instance.Struct; +import org.apache.atlas.notification.AbstractNotification; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest; +import org.apache.atlas.type.AtlasType; +import org.apache.commons.lang3.RandomStringUtils; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +/** + * HookMessageDeserializer tests. + */ +public class HookNotificationDeserializerTest { + private HookMessageDeserializer deserializer = new HookMessageDeserializer(); + + @Test + public void testDeserialize() throws Exception { + Referenceable entity = generateEntityWithTrait(); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + List<String> jsonMsgList = new ArrayList<>(); + + AbstractNotification.createNotificationMessages(message, jsonMsgList); + + HookNotification deserializedMessage = deserialize(jsonMsgList); + + assertEqualMessage(deserializedMessage, message); + } + + // validate deserialization of legacy message, which doesn't use MessageVersion + @Test + public void testDeserializeLegacyMessage() throws Exception { + Referenceable entity = generateEntityWithTrait(); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + String jsonMsg = AtlasType.toV1Json(message); + HookNotification deserializedMessage = deserialize(Collections.singletonList(jsonMsg)); + + assertEqualMessage(deserializedMessage, message); + } + + @Test + public void testDeserializeCompressedMessage() throws Exception { + Referenceable entity = generateLargeEntityWithTrait(); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + List<String> jsonMsgList = new ArrayList<>(); + + AbstractNotification.createNotificationMessages(message, jsonMsgList); + + assertTrue(jsonMsgList.size() == 1); + + String compressedMsg = jsonMsgList.get(0); + String uncompressedMsg = AtlasType.toV1Json(message); + + assertTrue(compressedMsg.length() < uncompressedMsg.length(), "Compressed message (" + compressedMsg.length() + ") should be shorter than uncompressed message (" + uncompressedMsg.length() + ")"); + + HookNotification deserializedMessage = deserialize(jsonMsgList); + + assertEqualMessage(deserializedMessage, message); + } + + @Test + public void testDeserializeSplitMessage() throws Exception { + Referenceable entity = generateVeryLargeEntityWithTrait(); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + List<String> jsonMsgList = new ArrayList<>(); + + AbstractNotification.createNotificationMessages(message, jsonMsgList); + + assertTrue(jsonMsgList.size() > 1); + + HookNotification deserializedMessage = deserialize(jsonMsgList); + + assertEqualMessage(deserializedMessage, message); + } + + private Referenceable generateEntityWithTrait() { + Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap())); + + return ret; + } + + private HookNotification deserialize(List<String> jsonMsgList) { + HookNotification deserializedMessage = null; + + for (String jsonMsg : jsonMsgList) { + deserializedMessage = deserializer.deserialize(jsonMsg); + + if (deserializedMessage != null) { + break; + } + } + + return deserializedMessage; + } + + private void assertEqualMessage(HookNotification deserializedMessage, EntityUpdateRequest message) throws Exception { + assertNotNull(deserializedMessage); + assertEquals(deserializedMessage.getType(), message.getType()); + assertEquals(deserializedMessage.getUser(), message.getUser()); + + assertTrue(deserializedMessage instanceof EntityUpdateRequest); + + EntityUpdateRequest deserializedEntityUpdateRequest = (EntityUpdateRequest) deserializedMessage; + Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0); + Referenceable entity = message.getEntities().get(0); + String traitName = entity.getTraitNames().get(0); + + assertEquals(deserializedEntity.getId(), entity.getId()); + assertEquals(deserializedEntity.getTypeName(), entity.getTypeName()); + assertEquals(deserializedEntity.getTraits(), entity.getTraits()); + assertEquals(deserializedEntity.getTrait(traitName).hashCode(), entity.getTrait(traitName).hashCode()); + + } + + private Referenceable generateLargeEntityWithTrait() { + Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap())); + + // add 100 attributes, each with value of size 10k + // Json Size=1,027,984; GZipped Size=16,387 ==> will compress, but not split + String attrValue = RandomStringUtils.randomAlphanumeric(10 * 1024); // use the same value for all attributes - to aid better compression + for (int i = 0; i < 100; i++) { + ret.set("attr_" + i, attrValue); + } + + return ret; + } + + private Referenceable generateVeryLargeEntityWithTrait() { + Referenceable ret = EntityNotificationTest.getEntity("id", new Struct("MyTrait", Collections.<String, Object>emptyMap())); + + // add 300 attributes, each with value of size 10k + // Json Size=3,082,384; GZipped Size=2,313,357 ==> will compress & split + for (int i = 0; i < 300; i++) { + ret.set("attr_" + i, RandomStringUtils.randomAlphanumeric(10 * 1024)); + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java index a8d4926..cf691af 100644 --- a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java @@ -17,14 +17,15 @@ */ package org.apache.atlas.notification.hook; +import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.model.notification.HookNotification.HookNotificationType; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.type.AtlasType; -import org.apache.atlas.v1.model.notification.HookNotification.EntityCreateRequest; -import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage; -import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationType; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class HookNotificationTest { @@ -37,18 +38,21 @@ public class HookNotificationTest { entity1.set("complex", new Referenceable("othertype")); Referenceable entity2 = new Referenceable("newtype"); String user = "user"; - EntityCreateRequest request = new EntityCreateRequest(user, entity1, entity2); - String notificationJson = AtlasType.toV1Json(request); - HookNotificationMessage actualNotification = deserializer.deserialize(notificationJson); + EntityCreateRequest request = new EntityCreateRequest(user, entity1, entity2); + String notificationJson = AtlasType.toV1Json(request); + HookNotification actualNotification = deserializer.deserialize(notificationJson); assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_CREATE); assertEquals(actualNotification.getUser(), user); + assertTrue(actualNotification instanceof EntityCreateRequest); EntityCreateRequest createRequest = (EntityCreateRequest) actualNotification; + assertEquals(createRequest.getEntities().size(), 2); Referenceable actualEntity1 = createRequest.getEntities().get(0); + assertEquals(actualEntity1.getTypeName(), "sometype"); assertEquals(((Referenceable)actualEntity1.get("complex")).getTypeName(), "othertype"); assertEquals(createRequest.getEntities().get(1).getTypeName(), "newtype"); @@ -59,9 +63,10 @@ public class HookNotificationTest { //Code to generate the json, use it for hard-coded json used later in this test Referenceable entity = new Referenceable("sometype"); entity.set("attr", "value"); - EntityCreateRequest request = new EntityCreateRequest(null, entity); - String notificationJsonFromCode = AtlasType.toV1Json(request); + EntityCreateRequest request = new EntityCreateRequest(null, entity); + String notificationJsonFromCode = AtlasType.toV1Json(request); + System.out.println(notificationJsonFromCode); //Json without user and assert that the string can be deserialised @@ -88,9 +93,9 @@ public class HookNotificationTest { + "}"; - HookNotificationMessage actualNotification = deserializer.deserialize(notificationJson); + HookNotification actualNotification = deserializer.deserialize(notificationJson); assertEquals(actualNotification.getType(), HookNotificationType.ENTITY_CREATE); - assertEquals(actualNotification.getUser(), HookNotificationMessage.UNKNOW_USER); + assertEquals(actualNotification.getUser(), HookNotification.UNKNOW_USER); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java index acbc996..4633de9 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationEntityChangeListener.java @@ -21,9 +21,11 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; -import org.apache.atlas.v1.model.notification.EntityNotification; +import org.apache.atlas.v1.model.notification.EntityNotificationV1; +import org.apache.atlas.v1.model.notification.EntityNotificationV1.OperationType; import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.type.AtlasClassificationType; import org.apache.atlas.type.AtlasTypeRegistry; @@ -69,32 +71,32 @@ public class NotificationEntityChangeListener implements EntityChangeListener { @Override public void onEntitiesAdded(Collection<Referenceable> entities, boolean isImport) throws AtlasException { - notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_CREATE); + notifyOfEntityEvent(entities, OperationType.ENTITY_CREATE); } @Override public void onEntitiesUpdated(Collection<Referenceable> entities, boolean isImport) throws AtlasException { - notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_UPDATE); + notifyOfEntityEvent(entities, OperationType.ENTITY_UPDATE); } @Override public void onTraitsAdded(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException { - notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_ADD); + notifyOfEntityEvent(Collections.singleton(entity), OperationType.TRAIT_ADD); } @Override public void onTraitsDeleted(Referenceable entity, Collection<String> traitNames) throws AtlasException { - notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_DELETE); + notifyOfEntityEvent(Collections.singleton(entity), OperationType.TRAIT_DELETE); } @Override public void onTraitsUpdated(Referenceable entity, Collection<? extends Struct> traits) throws AtlasException { - notifyOfEntityEvent(Collections.singleton(entity), EntityNotification.OperationType.TRAIT_UPDATE); + notifyOfEntityEvent(Collections.singleton(entity), OperationType.TRAIT_UPDATE); } @Override public void onEntitiesDeleted(Collection<Referenceable> entities, boolean isImport) throws AtlasException { - notifyOfEntityEvent(entities, EntityNotification.OperationType.ENTITY_DELETE); + notifyOfEntityEvent(entities, OperationType.ENTITY_DELETE); } @@ -145,8 +147,8 @@ public class NotificationEntityChangeListener implements EntityChangeListener { // send notification of entity change private void notifyOfEntityEvent(Collection<Referenceable> entityDefinitions, - EntityNotification.OperationType operationType) throws AtlasException { - List<EntityNotification> messages = new LinkedList<>(); + OperationType operationType) throws AtlasException { + List<EntityNotificationV1> messages = new ArrayList<>(); for (Referenceable entityDefinition : entityDefinitions) { if(GraphHelper.isInternalType(entityDefinition.getTypeName())) { @@ -165,13 +167,13 @@ public class NotificationEntityChangeListener implements EntityChangeListener { } } - EntityNotification notification = new EntityNotification(entity, operationType, getAllTraits(entity, typeRegistry)); + EntityNotificationV1 notification = new EntityNotificationV1(entity, operationType, getAllTraits(entity, typeRegistry)); messages.add(notification); } if (!messages.isEmpty()) { - notificationInterface.send(NotificationInterface.NotificationType.ENTITIES, messages); + notificationInterface.send(NotificationType.ENTITIES, messages); } }