http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java deleted file mode 100644 index 3b377de..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.notification; - - -import org.apache.atlas.AtlasConfiguration; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.codec.binary.StringUtils; -import org.apache.commons.compress.utils.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - - -public class AtlasNotificationBaseMessage { - private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationBaseMessage.class); - - public static final int MESSAGE_MAX_LENGTH_BYTES = AtlasConfiguration.NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES.getInt() - 512; // 512 bytes for envelop; - public static final boolean MESSAGE_COMPRESSION_ENABLED = AtlasConfiguration.NOTIFICATION_MESSAGE_COMPRESSION_ENABLED.getBoolean(); - - public enum CompressionKind { NONE, GZIP }; - - private MessageVersion version = null; - private String msgId = null; - private CompressionKind msgCompressionKind = CompressionKind.NONE; - private int msgSplitIdx = 1; - private int msgSplitCount = 1; - - - public AtlasNotificationBaseMessage() { - } - - public AtlasNotificationBaseMessage(MessageVersion version) { - this(version, null, CompressionKind.NONE); - } - - public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind) { - this.version = version; - this.msgId = msgId; - this.msgCompressionKind = msgCompressionKind; - } - - public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, int msgSplitIdx, int msgSplitCount) { - this.version = version; - this.msgId = msgId; - this.msgCompressionKind = msgCompressionKind; - this.msgSplitIdx = msgSplitIdx; - this.msgSplitCount = msgSplitCount; - } - - public void setVersion(MessageVersion version) { - this.version = version; - } - - public MessageVersion getVersion() { - return version; - } - - public String getMsgId() { - return msgId; - } - - public void setMsgId(String msgId) { - this.msgId = msgId; - } - - public CompressionKind getMsgCompressionKind() { - return msgCompressionKind; - } - - public void setMsgCompressed(CompressionKind msgCompressionKind) { - this.msgCompressionKind = msgCompressionKind; - } - - public int getMsgSplitIdx() { - return msgSplitIdx; - } - - public void setMsgSplitIdx(int msgSplitIdx) { - this.msgSplitIdx = msgSplitIdx; - } - - public int getMsgSplitCount() { - return msgSplitCount; - } - - public void setMsgSplitCount(int msgSplitCount) { - this.msgSplitCount = msgSplitCount; - } - - /** - * Compare the version of this message with the given version. - * - * @param compareToVersion the version to compare to - * - * @return a negative integer, zero, or a positive integer as this message's version is less than, equal to, - * or greater than the given version. - */ - public int compareVersion(MessageVersion compareToVersion) { - return version.compareTo(compareToVersion); - } - - - public static byte[] getBytesUtf8(String str) { - return StringUtils.getBytesUtf8(str); - } - - public static String getStringUtf8(byte[] bytes) { - return StringUtils.newStringUtf8(bytes); - } - - public static byte[] encodeBase64(byte[] bytes) { - return Base64.encodeBase64(bytes); - } - - public static byte[] decodeBase64(byte[] bytes) { - return Base64.decodeBase64(bytes); - } - - public static byte[] gzipCompressAndEncodeBase64(byte[] bytes) { - return encodeBase64(gzipCompress(bytes)); - } - - public static byte[] decodeBase64AndGzipUncompress(byte[] bytes) { - return gzipUncompress(decodeBase64(bytes)); - } - - public static String gzipCompress(String str) { - byte[] bytes = getBytesUtf8(str); - byte[] compressedBytes = gzipCompress(bytes); - byte[] encodedBytes = encodeBase64(compressedBytes); - - return getStringUtf8(encodedBytes); - } - - public static String gzipUncompress(String str) { - byte[] encodedBytes = getBytesUtf8(str); - byte[] compressedBytes = decodeBase64(encodedBytes); - byte[] bytes = gzipUncompress(compressedBytes); - - return getStringUtf8(bytes); - } - - public static byte[] gzipCompress(byte[] content) { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - - try { - GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream); - - gzipOutputStream.write(content); - gzipOutputStream.close(); - } catch (IOException e) { - LOG.error("gzipCompress(): error compressing {} bytes", content.length, e); - - throw new RuntimeException(e); - } - - return byteArrayOutputStream.toByteArray(); - } - - public static byte[] gzipUncompress(byte[] content) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - - try { - IOUtils.copy(new GZIPInputStream(new ByteArrayInputStream(content)), out); - } catch (IOException e) { - LOG.error("gzipUncompress(): error uncompressing {} bytes", content.length, e); - } - - return out.toByteArray(); - } -} -
http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java deleted file mode 100644 index 63d93c9..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.notification; - -import org.joda.time.DateTimeZone; -import org.joda.time.Instant; - -/** - * Represents a notification message that is associated with a version. - */ -public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage { - private String msgSourceIP; - private String msgCreatedBy; - private long msgCreationTime; - - /** - * The actual message. - */ - private final T message; - - - // ----- Constructors ---------------------------------------------------- - - /** - * Create a notification message. - * - * @param version the message version - * @param message the actual message - */ - public AtlasNotificationMessage(MessageVersion version, T message) { - this(version, message, null, null); - } - - public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) { - super(version); - - this.msgSourceIP = msgSourceIP; - this.msgCreatedBy = createdBy; - this.msgCreationTime = Instant.now().toDateTime(DateTimeZone.UTC).getMillis(); - this.message = message; - } - - - public String getMsgSourceIP() { - return msgSourceIP; - } - - public void setMsgSourceIP(String msgSourceIP) { - this.msgSourceIP = msgSourceIP; - } - - public String getMsgCreatedBy() { - return msgCreatedBy; - } - - public void setMsgCreatedBy(String msgCreatedBy) { - this.msgCreatedBy = msgCreatedBy; - } - - public long getMsgCreationTime() { - return msgCreationTime; - } - - public void setMsgCreationTime(long msgCreationTime) { - this.msgCreationTime = msgCreationTime; - } - - public T getMessage() { - return message; - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java index 2a175ba..d6e6878 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java @@ -19,14 +19,17 @@ package org.apache.atlas.notification; import com.google.common.annotations.VisibleForTesting; -import com.google.gson.Gson; -import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind; +import org.apache.atlas.model.notification.AtlasNotificationBaseMessage; +import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind; +import org.apache.atlas.model.notification.AtlasNotificationMessage; +import org.apache.atlas.model.notification.AtlasNotificationStringMessage; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.model.notification.MessageVersion; import org.apache.commons.lang3.StringUtils; +import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -47,11 +50,10 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message public static final String VERSION_MISMATCH_MSG = "Notification message version mismatch. Expected %s but recieved %s. Message %s"; - private final Type notificationMessageType; - private final Type messageType; - private final MessageVersion expectedVersion; - private final Logger notificationLogger; - private final Gson gson; + private final TypeReference<T> messageType; + private final TypeReference<AtlasNotificationMessage<T>> notificationMessageType; + private final MessageVersion expectedVersion; + private final Logger notificationLogger; private final Map<String, SplitMessageAggregator> splitMsgBuffer = new HashMap<>(); @@ -65,33 +67,40 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message /** * Create a notification message deserializer. * - * @param notificationMessageType the type of the notification message * @param expectedVersion the expected message version - * @param gson JSON serialization/deserialization * @param notificationLogger logger for message version mismatch */ - public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion, - Gson gson, Logger notificationLogger) { - this(notificationMessageType, expectedVersion, gson, notificationLogger, + public AtlasNotificationMessageDeserializer(TypeReference<T> messageType, + TypeReference<AtlasNotificationMessage<T>> notificationMessageType, + MessageVersion expectedVersion, Logger notificationLogger) { + this(messageType, notificationMessageType, expectedVersion, notificationLogger, NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS.getLong() * 1000, NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS.getLong() * 1000); } - public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion, - Gson gson, Logger notificationLogger, + public AtlasNotificationMessageDeserializer(TypeReference<T> messageType, + TypeReference<AtlasNotificationMessage<T>> notificationMessageType, + MessageVersion expectedVersion, + Logger notificationLogger, long splitMessageSegmentsWaitTimeMs, long splitMessageBufferPurgeIntervalMs) { + this.messageType = messageType; this.notificationMessageType = notificationMessageType; - this.messageType = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0]; this.expectedVersion = expectedVersion; - this.gson = gson; this.notificationLogger = notificationLogger; this.splitMessageSegmentsWaitTimeMs = splitMessageSegmentsWaitTimeMs; this.splitMessageBufferPurgeIntervalMs = splitMessageBufferPurgeIntervalMs; } - // ----- MessageDeserializer --------------------------------------------- + public TypeReference<T> getMessageType() { + return messageType; + } + public TypeReference<AtlasNotificationMessage<T>> getNotificationMessageType() { + return notificationMessageType; + } + + // ----- MessageDeserializer --------------------------------------------- @Override public T deserialize(String messageJson) { final T ret; @@ -99,15 +108,15 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message messageCountTotal.incrementAndGet(); messageCountSinceLastInterval.incrementAndGet(); - AtlasNotificationBaseMessage msg = gson.fromJson(messageJson, AtlasNotificationBaseMessage.class); + AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationBaseMessage.class); if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage - ret = gson.fromJson(messageJson, messageType); + ret = AtlasType.fromV1Json(messageJson, messageType); } else { String msgJson = messageJson; if (msg.getMsgSplitCount() > 1) { // multi-part message - AtlasNotificationStringMessage splitMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class); + AtlasNotificationStringMessage splitMsg = AtlasType.fromV1Json(msgJson, AtlasNotificationStringMessage.class); checkVersion(splitMsg, msgJson); @@ -184,7 +193,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message LOG.info("Received msgID={}: splitCount={}, length={} bytes", msgId, splitCount, bytes.length); } - msg = gson.fromJson(msgJson, AtlasNotificationBaseMessage.class); + msg = AtlasType.fromV1Json(msgJson, AtlasNotificationBaseMessage.class); } else { msg = null; } @@ -197,7 +206,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message if (msg != null) { if (CompressionKind.GZIP.equals(msg.getMsgCompressionKind())) { - AtlasNotificationStringMessage compressedMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class); + AtlasNotificationStringMessage compressedMsg = AtlasType.fromV1Json(msgJson, AtlasNotificationStringMessage.class); byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(compressedMsg.getMessage()); byte[] bytes = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes); @@ -207,7 +216,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message LOG.info("Received msgID={}: compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, bytes.length); } - AtlasNotificationMessage<T> atlasNotificationMessage = gson.fromJson(msgJson, notificationMessageType); + AtlasNotificationMessage<T> atlasNotificationMessage = AtlasType.fromV1Json(msgJson, notificationMessageType); checkVersion(atlasNotificationMessage, msgJson); http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java deleted file mode 100644 index 41485a0..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.notification; - - -public class AtlasNotificationStringMessage extends AtlasNotificationBaseMessage { - private String message = null; - - public AtlasNotificationStringMessage() { - super(AbstractNotification.CURRENT_MESSAGE_VERSION); - } - - public AtlasNotificationStringMessage(String message) { - super(AbstractNotification.CURRENT_MESSAGE_VERSION); - - this.message = message; - } - - public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind) { - super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind); - - this.message = message; - } - - public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) { - super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount); - - this.message = message; - } - - public AtlasNotificationStringMessage(byte[] encodedBytes, String msgId, CompressionKind compressionKind) { - super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind); - - this.message = AtlasNotificationBaseMessage.getStringUtf8(encodedBytes); - } - - public AtlasNotificationStringMessage(byte[] encodedBytes, int offset, int length, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) { - super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount); - - this.message = new String(encodedBytes, offset, length); - } - - public void setMessage(String message) { - this.message = message; - } - - public String getMessage() { - return message; - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java deleted file mode 100644 index 7f96638..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.notification; - -import java.util.ArrayList; -import java.util.Arrays; - -/** - * Represents the version of a notification message. - */ -public class MessageVersion implements Comparable<MessageVersion> { - /** - * Used for message with no version (old format). - */ - public static final MessageVersion NO_VERSION = new MessageVersion("0"); - public static final MessageVersion VERSION_1 = new MessageVersion("1.0.0"); - - public static final MessageVersion CURRENT_VERSION = VERSION_1; - - private final String version; - - - // ----- Constructors ---------------------------------------------------- - - /** - * Create a message version. - * - * @param version the version string - */ - public MessageVersion(String version) { - this.version = version; - - try { - getVersionParts(); - } catch (NumberFormatException e) { - throw new IllegalArgumentException(String.format("Invalid version string : %s.", version), e); - } - } - - - // ----- Comparable ------------------------------------------------------ - - @Override - public int compareTo(MessageVersion that) { - if (that == null) { - return 1; - } - - Integer[] thisParts = getVersionParts(); - Integer[] thatParts = that.getVersionParts(); - - int length = Math.max(thisParts.length, thatParts.length); - - for (int i = 0; i < length; i++) { - - int comp = getVersionPart(thisParts, i) - getVersionPart(thatParts, i); - - if (comp != 0) { - return comp; - } - } - return 0; - } - - - // ----- Object overrides ------------------------------------------------ - - @Override - public boolean equals(Object that) { - if (this == that){ - return true; - } - - if (that == null || getClass() != that.getClass()) { - return false; - } - - return compareTo((MessageVersion) that) == 0; - } - - @Override - public int hashCode() { - return Arrays.hashCode(getVersionParts()); - } - - - @Override - public String toString() { - return "MessageVersion[version=" + version + "]"; - } - - // ----- helper methods -------------------------------------------------- - - /** - * Get the version parts array by splitting the version string. - * Strip the trailing zeros (i.e. '1.0.0' equals '1'). - * - * @return the version parts array - */ - protected Integer[] getVersionParts() { - - String[] sParts = version.split("\\."); - ArrayList<Integer> iParts = new ArrayList<>(); - int trailingZeros = 0; - - for (String sPart : sParts) { - Integer iPart = new Integer(sPart); - - if (iPart == 0) { - ++trailingZeros; - } else { - for (int i = 0; i < trailingZeros; ++i) { - iParts.add(0); - } - trailingZeros = 0; - iParts.add(iPart); - } - } - return iParts.toArray(new Integer[iParts.size()]); - } - - private Integer getVersionPart(Integer[] versionParts, int i) { - return i < versionParts.length ? versionParts[i] : 0; - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java index 8809225..6caf7e2 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java @@ -17,13 +17,9 @@ */ package org.apache.atlas.notification; -import com.google.gson.reflect.TypeToken; import org.apache.atlas.notification.entity.EntityMessageDeserializer; -import org.apache.atlas.notification.entity.EntityNotification; import org.apache.atlas.notification.hook.HookMessageDeserializer; -import org.apache.atlas.notification.hook.HookNotification; -import java.lang.reflect.Type; import java.util.List; /** @@ -43,57 +39,22 @@ public interface NotificationInterface { String PROPERTY_PREFIX = "atlas.notification"; /** - * Notification message class types. - */ - Class<HookNotification.HookNotificationMessage> HOOK_NOTIFICATION_CLASS = - HookNotification.HookNotificationMessage.class; - - Class<EntityNotification> ENTITY_NOTIFICATION_CLASS = EntityNotification.class; - - /** - * Versioned notification message class types. - */ - Type HOOK_VERSIONED_MESSAGE_TYPE = - new TypeToken<AtlasNotificationMessage<HookNotification.HookNotificationMessage>>(){}.getType(); - - Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<AtlasNotificationMessage<EntityNotification>>(){}.getType(); - - /** * Atlas notification types. */ enum NotificationType { - // Notifications from the Atlas integration hooks. - HOOK(HOOK_NOTIFICATION_CLASS, new HookMessageDeserializer()), + HOOK(new HookMessageDeserializer()), // Notifications to entity change consumers. - ENTITIES(ENTITY_NOTIFICATION_CLASS, new EntityMessageDeserializer()); - - - /** - * The notification class associated with this type. - */ - private final Class classType; - - /** - * The message deserializer for this type. - */ - private final MessageDeserializer deserializer; + ENTITIES(new EntityMessageDeserializer()); + private final AtlasNotificationMessageDeserializer deserializer; - NotificationType(Class classType, MessageDeserializer<?> deserializer) { - this.classType = classType; + NotificationType(AtlasNotificationMessageDeserializer deserializer) { this.deserializer = deserializer; } - - // ----- accessors --------------------------------------------------- - - public Class getClassType() { - return classType; - } - - public MessageDeserializer getDeserializer() { + public AtlasNotificationMessageDeserializer getDeserializer() { return deserializer; } } http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java index 148b57f..10df121 100644 --- a/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java +++ b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java @@ -18,6 +18,8 @@ package org.apache.atlas.notification; +import org.apache.atlas.model.notification.AtlasNotificationStringMessage; + public class SplitMessageAggregator { private final String msgId; private final AtlasNotificationStringMessage[] splitMessagesBuffer; http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java index a6f7e64..fa160cf 100644 --- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java @@ -18,19 +18,14 @@ package org.apache.atlas.notification.entity; -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; +import org.apache.atlas.model.notification.AtlasNotificationMessage; +import org.apache.atlas.model.notification.EntityNotification; import org.apache.atlas.notification.AbstractMessageDeserializer; import org.apache.atlas.notification.AbstractNotification; -import org.apache.atlas.notification.NotificationInterface; +import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Type; -import java.util.Collections; -import java.util.Map; - /** * Entity notification message deserializer. */ @@ -48,29 +43,19 @@ public class EntityMessageDeserializer extends AbstractMessageDeserializer<Entit * Create an entity notification message deserializer. */ public EntityMessageDeserializer() { - super(NotificationInterface.ENTITY_VERSIONED_MESSAGE_TYPE, - AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER); - } - - - // ----- helper methods -------------------------------------------------- - - private static Map<Type, JsonDeserializer> getDeserializerMap() { - return Collections.<Type, JsonDeserializer>singletonMap( - NotificationInterface.ENTITY_NOTIFICATION_CLASS, new EntityNotificationDeserializer()); + super(new TypeReference<EntityNotification>() {}, + new TypeReference<AtlasNotificationMessage<EntityNotification>>() {}, + AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER); } + @Override + public EntityNotification deserialize(String messageJson) { + final EntityNotification ret = super.deserialize(messageJson); - // ----- deserializer classes -------------------------------------------- - - /** - * Deserializer for EntityNotification. - */ - protected static final class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> { - @Override - public EntityNotification deserialize(final JsonElement json, final Type type, - final JsonDeserializationContext context) { - return context.deserialize(json, EntityNotificationImpl.class); + if (ret != null) { + ret.normalize(); } + + return ret; } } http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java deleted file mode 100644 index 379e815..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.notification.entity; - -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.IStruct; - -import java.util.List; - -/** - * Notification of entity changes. - */ -public interface EntityNotification { - - /** - * Operations that result in an entity notification. - */ - enum OperationType { - ENTITY_CREATE, - ENTITY_UPDATE, - ENTITY_DELETE, - TRAIT_ADD, - TRAIT_DELETE, - TRAIT_UPDATE - } - - - // ----- EntityNotification ------------------------------------------------ - - /** - * Get the entity that is associated with this notification. - * - * @return the associated entity - */ - IReferenceableInstance getEntity(); - - /** - * Get flattened list of traits that are associated with this entity (includes super traits). - * - * @return the list of all traits - */ - List<IStruct> getAllTraits(); - - /** - * Get the type of operation that triggered this notification. - * - * @return the operation type - */ - OperationType getOperationType(); -} http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java deleted file mode 100644 index 6a9b362..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.notification.entity; - -import org.apache.atlas.AtlasException; -import org.apache.atlas.typesystem.IReferenceableInstance; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.Struct; -import org.apache.atlas.typesystem.types.FieldMapping; -import org.apache.atlas.typesystem.types.TraitType; -import org.apache.atlas.typesystem.types.TypeSystem; - -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; - -/** - * Entity notification implementation. - */ -public class EntityNotificationImpl implements EntityNotification { - - private final Referenceable entity; - private final OperationType operationType; - private final List<IStruct> traits; - - - // ----- Constructors ------------------------------------------------------ - - /** - * No-arg constructor for serialization. - */ - @SuppressWarnings("unused") - private EntityNotificationImpl() throws AtlasException { - this(null, OperationType.ENTITY_CREATE, Collections.<IStruct>emptyList()); - } - - /** - * Construct an EntityNotification. - * - * @param entity the entity subject of the notification - * @param operationType the type of operation that caused the notification - * @param traits the traits for the given entity - * - * @throws AtlasException if the entity notification can not be created - */ - public EntityNotificationImpl(Referenceable entity, OperationType operationType, List<IStruct> traits) - throws AtlasException { - this.entity = entity; - this.operationType = operationType; - this.traits = traits; - } - - /** - * Construct an EntityNotification. - * - * @param entity the entity subject of the notification - * @param operationType the type of operation that caused the notification - * @param typeSystem the Atlas type system - * - * @throws AtlasException if the entity notification can not be created - */ - public EntityNotificationImpl(Referenceable entity, OperationType operationType, TypeSystem typeSystem) - throws AtlasException { - this(entity, operationType, getAllTraits(entity, typeSystem)); - } - - - // ----- EntityNotification ------------------------------------------------ - - @Override - public IReferenceableInstance getEntity() { - return entity; - } - - @Override - public List<IStruct> getAllTraits() { - return traits; - } - - @Override - public OperationType getOperationType() { - return operationType; - } - - - // ----- Object overrides -------------------------------------------------- - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - EntityNotificationImpl that = (EntityNotificationImpl) o; - return Objects.equals(entity, that.entity) && - operationType == that.operationType && - Objects.equals(traits, that.traits); - } - - @Override - public int hashCode() { - return Objects.hash(entity, operationType, traits); - } - - - // ----- helper methods ---------------------------------------------------- - - private static List<IStruct> getAllTraits(IReferenceableInstance entityDefinition, - TypeSystem typeSystem) throws AtlasException { - List<IStruct> traitInfo = new LinkedList<>(); - for (String traitName : entityDefinition.getTraits()) { - IStruct trait = entityDefinition.getTrait(traitName); - String typeName = trait.getTypeName(); - Map<String, Object> valuesMap = trait.getValuesMap(); - traitInfo.add(new Struct(typeName, valuesMap)); - traitInfo.addAll(getSuperTraits(typeName, valuesMap, typeSystem)); - } - return traitInfo; - } - - private static List<IStruct> getSuperTraits( - String typeName, Map<String, Object> values, TypeSystem typeSystem) throws AtlasException { - - List<IStruct> superTypes = new LinkedList<>(); - - TraitType traitDef = typeSystem.getDataType(TraitType.class, typeName); - Set<String> superTypeNames = traitDef.getAllSuperTypeNames(); - - for (String superTypeName : superTypeNames) { - TraitType superTraitDef = typeSystem.getDataType(TraitType.class, superTypeName); - - Map<String, Object> superTypeValues = new HashMap<>(); - - FieldMapping fieldMapping = superTraitDef.fieldMapping(); - - if (fieldMapping != null) { - Set<String> superTypeAttributeNames = fieldMapping.fields.keySet(); - - for (String superTypeAttributeName : superTypeAttributeNames) { - if (values.containsKey(superTypeAttributeName)) { - superTypeValues.put(superTypeAttributeName, values.get(superTypeAttributeName)); - } - } - } - IStruct superTrait = new Struct(superTypeName, superTypeValues); - superTypes.add(superTrait); - superTypes.addAll(getSuperTraits(superTypeName, values, typeSystem)); - } - - return superTypes; - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java index 8337de0..cab442f 100644 --- a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java @@ -18,21 +18,20 @@ package org.apache.atlas.notification.hook; -import com.google.gson.JsonDeserializer; +import org.apache.atlas.model.notification.AtlasNotificationMessage; +import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.notification.AbstractMessageDeserializer; import org.apache.atlas.notification.AbstractNotification; -import org.apache.atlas.notification.NotificationInterface; +import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Type; -import java.util.Collections; -import java.util.Map; + /** * Hook notification message deserializer. */ -public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNotification.HookNotificationMessage> { +public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNotification> { /** * Logger for hook notification messages. @@ -46,15 +45,19 @@ public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNot * Create a hook notification message deserializer. */ public HookMessageDeserializer() { - super(NotificationInterface.HOOK_VERSIONED_MESSAGE_TYPE, - AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER); + super(new TypeReference<HookNotification>() {}, + new TypeReference<AtlasNotificationMessage<HookNotification>>() {}, + AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER); } + @Override + public HookNotification deserialize(String messageJson) { + final HookNotification ret = super.deserialize(messageJson); - // ----- helper methods -------------------------------------------------- + if (ret != null) { + ret.normalize(); + } - private static Map<Type, JsonDeserializer> getDeserializerMap() { - return Collections.<Type, JsonDeserializer>singletonMap( - NotificationInterface.HOOK_NOTIFICATION_CLASS, new HookNotification()); + return ret; } } http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java deleted file mode 100644 index a25aa52..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java +++ /dev/null @@ -1,275 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.notification.hook; - -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.TypesDef; -import org.apache.atlas.typesystem.json.InstanceSerialization; -import org.apache.commons.lang.StringUtils; -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONException; - -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * Contains the structure of messages transferred from hooks to atlas. - */ -public class HookNotification implements JsonDeserializer<HookNotification.HookNotificationMessage> { - - @Override - public HookNotificationMessage deserialize(JsonElement json, Type typeOfT, - JsonDeserializationContext context) { - HookNotificationType type = - context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class); - switch (type) { - case ENTITY_CREATE: - return context.deserialize(json, EntityCreateRequest.class); - - case ENTITY_FULL_UPDATE: - return context.deserialize(json, EntityUpdateRequest.class); - - case ENTITY_PARTIAL_UPDATE: - return context.deserialize(json, EntityPartialUpdateRequest.class); - - case ENTITY_DELETE: - return context.deserialize(json, EntityDeleteRequest.class); - - case TYPE_CREATE: - case TYPE_UPDATE: - return context.deserialize(json, TypeRequest.class); - - default: - throw new IllegalStateException("Unhandled type " + type); - } - } - - /** - * Type of the hook message. - */ - public enum HookNotificationType { - TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE - } - - /** - * Base type of hook message. - */ - public static class HookNotificationMessage { - public static final String UNKNOW_USER = "UNKNOWN"; - protected HookNotificationType type; - protected String user; - - private HookNotificationMessage() { - } - - public HookNotificationMessage(HookNotificationType type, String user) { - this.type = type; - this.user = user; - } - - public HookNotificationType getType() { - return type; - } - - public String getUser() { - if (StringUtils.isEmpty(user)) { - return UNKNOW_USER; - } - return user; - } - - - } - - /** - * Hook message for create type definitions. - */ - public static class TypeRequest extends HookNotificationMessage { - private TypesDef typesDef; - - private TypeRequest() { - } - - public TypeRequest(HookNotificationType type, TypesDef typesDef, String user) { - super(type, user); - this.typesDef = typesDef; - } - - public TypesDef getTypesDef() { - return typesDef; - } - } - - /** - * Hook message for creating new entities. - */ - public static class EntityCreateRequest extends HookNotificationMessage { - private List<Referenceable> entities; - - private EntityCreateRequest() { - } - - public EntityCreateRequest(String user, Referenceable... entities) { - this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities), user); - } - - public EntityCreateRequest(String user, List<Referenceable> entities) { - this(HookNotificationType.ENTITY_CREATE, entities, user); - } - - protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities, String user) { - super(type, user); - this.entities = entities; - } - - public EntityCreateRequest(String user, JSONArray jsonArray) { - super(HookNotificationType.ENTITY_CREATE, user); - entities = new ArrayList<>(); - for (int index = 0; index < jsonArray.length(); index++) { - try { - entities.add(InstanceSerialization.fromJsonReferenceable(jsonArray.getString(index), true)); - } catch (JSONException e) { - throw new JsonParseException(e); - } - } - } - - public List<Referenceable> getEntities() { - return entities; - } - - @Override - public String toString() { - return entities.toString(); - } - } - - /** - * Hook message for updating entities(full update). - */ - public static class EntityUpdateRequest extends EntityCreateRequest { - public EntityUpdateRequest(String user, Referenceable... entities) { - this(user, Arrays.asList(entities)); - } - - public EntityUpdateRequest(String user, List<Referenceable> entities) { - super(HookNotificationType.ENTITY_FULL_UPDATE, entities, user); - } - } - - /** - * Hook message for updating entities(partial update). - */ - public static class EntityPartialUpdateRequest extends HookNotificationMessage { - private String typeName; - private String attribute; - private Referenceable entity; - private String attributeValue; - - private EntityPartialUpdateRequest() { - } - - public EntityPartialUpdateRequest(String user, String typeName, String attribute, String attributeValue, - Referenceable entity) { - super(HookNotificationType.ENTITY_PARTIAL_UPDATE, user); - this.typeName = typeName; - this.attribute = attribute; - this.attributeValue = attributeValue; - this.entity = entity; - } - - public String getTypeName() { - return typeName; - } - - public String getAttribute() { - return attribute; - } - - public Referenceable getEntity() { - return entity; - } - - public String getAttributeValue() { - return attributeValue; - } - - @Override - public String toString() { - return "{" - + "entityType='" + typeName + '\'' - + ", attribute=" + attribute - + ", value=" + attributeValue - + ", entity=" + entity - + '}'; - } - } - - /** - * Hook message for creating new entities. - */ - public static class EntityDeleteRequest extends HookNotificationMessage { - - private String typeName; - private String attribute; - private String attributeValue; - - private EntityDeleteRequest() { - } - - public EntityDeleteRequest(String user, String typeName, String attribute, String attributeValue) { - this(HookNotificationType.ENTITY_DELETE, user, typeName, attribute, attributeValue); - } - - protected EntityDeleteRequest(HookNotificationType type, - String user, String typeName, String attribute, String attributeValue) { - super(type, user); - this.typeName = typeName; - this.attribute = attribute; - this.attributeValue = attributeValue; - } - - public String getTypeName() { - return typeName; - } - - public String getAttribute() { - return attribute; - } - - public String getAttributeValue() { - return attributeValue; - } - - @Override - public String toString() { - return "{" - + "entityType='" + typeName + '\'' - + ", attribute=" + attribute - + ", value=" + attributeValue - + '}'; - } - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java index d59cb1c..0a0620f 100644 --- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java +++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java @@ -18,9 +18,10 @@ package org.apache.atlas.hook; +import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; @@ -51,41 +52,41 @@ public class AtlasHookTest { @Test (timeOut = 10000) public void testNotifyEntitiesDoesNotHangOnException() throws Exception { - List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); + List<HookNotification> hookNotifications = new ArrayList<>(); doThrow(new NotificationException(new Exception())).when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 0, notificationInterface, false, + .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + AtlasHook.notifyEntitiesInternal(hookNotifications, 0, notificationInterface, false, failedMessagesLogger); // if we've reached here, the method finished OK. } @Test public void testNotifyEntitiesRetriesOnException() throws NotificationException { - List<HookNotification.HookNotificationMessage> hookNotificationMessages = - new ArrayList<HookNotification.HookNotificationMessage>() {{ - add(new HookNotification.EntityCreateRequest("user")); + List<HookNotification> hookNotifications = + new ArrayList<HookNotification>() {{ + add(new EntityCreateRequest("user")); } }; doThrow(new NotificationException(new Exception())).when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, false, + .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, false, failedMessagesLogger); verify(notificationInterface, times(2)). - send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); + send(NotificationInterface.NotificationType.HOOK, hookNotifications); } @Test public void testFailedMessageIsLoggedIfRequired() throws NotificationException { - List<HookNotification.HookNotificationMessage> hookNotificationMessages = - new ArrayList<HookNotification.HookNotificationMessage>() {{ - add(new HookNotification.EntityCreateRequest("user")); + List<HookNotification> hookNotifications = + new ArrayList<HookNotification>() {{ + add(new EntityCreateRequest("user")); } }; doThrow(new NotificationException(new Exception(), Arrays.asList("test message"))) .when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true, + .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, true, failedMessagesLogger); verify(failedMessagesLogger, times(1)).log("test message"); @@ -93,11 +94,11 @@ public class AtlasHookTest { @Test public void testFailedMessageIsNotLoggedIfNotRequired() throws NotificationException { - List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); + List<HookNotification> hookNotifications = new ArrayList<>(); doThrow(new NotificationException(new Exception(), Arrays.asList("test message"))) .when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, false, + .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, false, failedMessagesLogger); verifyZeroInteractions(failedMessagesLogger); @@ -105,15 +106,15 @@ public class AtlasHookTest { @Test public void testAllFailedMessagesAreLogged() throws NotificationException { - List<HookNotification.HookNotificationMessage> hookNotificationMessages = - new ArrayList<HookNotification.HookNotificationMessage>() {{ - add(new HookNotification.EntityCreateRequest("user")); + List<HookNotification> hookNotifications = + new ArrayList<HookNotification>() {{ + add(new EntityCreateRequest("user")); } }; doThrow(new NotificationException(new Exception(), Arrays.asList("test message1", "test message2"))) .when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true, + .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, true, failedMessagesLogger); verify(failedMessagesLogger, times(1)).log("test message1"); @@ -122,10 +123,10 @@ public class AtlasHookTest { @Test public void testFailedMessageIsNotLoggedIfNotANotificationException() throws Exception { - List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); + List<HookNotification> hookNotifications = new ArrayList<>(); doThrow(new RuntimeException("test message")).when(notificationInterface) - .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages); - AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface, true, + .send(NotificationInterface.NotificationType.HOOK, hookNotifications); + AtlasHook.notifyEntitiesInternal(hookNotifications, 2, notificationInterface, true, failedMessagesLogger); verifyZeroInteractions(failedMessagesLogger); http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java index 08a20bd..2e8abd7 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java @@ -18,34 +18,30 @@ package org.apache.atlas.kafka; -import kafka.message.MessageAndMetadata; -import org.apache.atlas.notification.*; -import org.apache.atlas.notification.AtlasNotificationMessage; -import org.apache.atlas.notification.entity.EntityNotificationImplTest; -import org.apache.atlas.notification.hook.HookNotification; -import org.apache.atlas.typesystem.IStruct; -import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.Struct; +import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.v1.model.instance.Referenceable; +import org.apache.atlas.v1.model.instance.Struct; +import org.apache.atlas.notification.IncompatibleVersionException; +import org.apache.atlas.notification.NotificationInterface.NotificationType; +import org.apache.atlas.model.notification.AtlasNotificationMessage; +import org.apache.atlas.notification.entity.EntityNotificationTest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.model.notification.MessageVersion; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.codehaus.jettison.json.JSONException; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.util.Collections; -import java.util.LinkedList; import java.util.List; -import java.util.Arrays; -import java.util.ArrayList; -import java.util.HashMap; import java.util.Map; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -55,7 +51,6 @@ import static org.testng.Assert.*; * KafkaConsumer tests. */ public class KafkaConsumerTest { - private static final String TRAIT_NAME = "MyTrait"; @@ -70,88 +65,62 @@ public class KafkaConsumerTest { @Test public void testReceive() throws Exception { - - - MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class); - - Referenceable entity = getEntity(TRAIT_NAME); - - HookNotification.EntityUpdateRequest message = - new HookNotification.EntityUpdateRequest("user1", entity); - - String json = AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message)); - - kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0))); - List<ConsumerRecord> klist = new ArrayList<>(); - klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK", - 0, 0L, "mykey", json)); - - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - Map mp = new HashMap(); - mp.put(tp,klist); - ConsumerRecords records = new ConsumerRecords(mp); - + Referenceable entity = getEntity(TRAIT_NAME); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message)); + TopicPartition tp = new TopicPartition("ATLAS_HOOK", 0); + List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>("ATLAS_HOOK", 0, 0L, "mykey", json)); + Map mp = Collections.singletonMap(tp, klist); + ConsumerRecords records = new ConsumerRecords(mp); when(kafkaConsumer.poll(100)).thenReturn(records); - when(messageAndMetadata.message()).thenReturn(json); + kafkaConsumer.assign(Collections.singletonList(tp)); + + AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L); + List<AtlasKafkaMessage<HookNotification>> messageList = consumer.receive(); - AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L); - List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(); assertTrue(messageList.size() > 0); - HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage(); + HookNotification consumedMessage = messageList.get(0).getMessage(); assertMessagesEqual(message, consumedMessage, entity); - } @Test public void testNextVersionMismatch() throws Exception { + Referenceable entity = getEntity(TRAIT_NAME); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message)); + TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); + List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>("ATLAS_HOOK", 0, 0L, "mykey", json)); + Map mp = Collections.singletonMap(tp,klist); + ConsumerRecords records = new ConsumerRecords(mp); - MessageAndMetadata<String, String> messageAndMetadata = mock(MessageAndMetadata.class); - - Referenceable entity = getEntity(TRAIT_NAME); - - HookNotification.EntityUpdateRequest message = - new HookNotification.EntityUpdateRequest("user1", entity); - - String json = AbstractNotification.GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message)); - - kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 0))); - List<ConsumerRecord> klist = new ArrayList<>(); - klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK", - 0, 0L, "mykey", json)); - - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - Map mp = new HashMap(); - mp.put(tp,klist); - ConsumerRecords records = new ConsumerRecords(mp); + kafkaConsumer.assign(Collections.singletonList(tp)); when(kafkaConsumer.poll(100L)).thenReturn(records); - when(messageAndMetadata.message()).thenReturn(json); - AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false, 100L); + AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer ,false, 100L); + try { - List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(); + List<AtlasKafkaMessage<HookNotification>> messageList = consumer.receive(); + assertTrue(messageList.size() > 0); - HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage(); + HookNotification consumedMessage = messageList.get(0).getMessage(); fail("Expected VersionMismatchException!"); } catch (IncompatibleVersionException e) { e.printStackTrace(); } - } @Test public void testCommitIsCalledIfAutoCommitDisabled() { - - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - - AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L); + TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); + AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L); consumer.commit(tp, 1); @@ -160,10 +129,8 @@ public class KafkaConsumerTest { @Test public void testCommitIsNotCalledIfAutoCommitEnabled() { - - TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - - AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true , 100L); + TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); + AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, true , 100L); consumer.commit(tp, 1); @@ -171,26 +138,21 @@ public class KafkaConsumerTest { } private Referenceable getEntity(String traitName) { - Referenceable entity = EntityNotificationImplTest.getEntity("id"); - List<IStruct> traitInfo = new LinkedList<>(); - IStruct trait = new Struct(traitName, Collections.<String, Object>emptyMap()); - traitInfo.add(trait); - return entity; + return EntityNotificationTest.getEntity("id", new Struct(traitName, Collections.<String, Object>emptyMap())); } - private void assertMessagesEqual(HookNotification.EntityUpdateRequest message, - HookNotification.HookNotificationMessage consumedMessage, - Referenceable entity) throws JSONException { - + private void assertMessagesEqual(EntityUpdateRequest message, + HookNotification consumedMessage, + Referenceable entity) { assertEquals(consumedMessage.getType(), message.getType()); assertEquals(consumedMessage.getUser(), message.getUser()); - assertTrue(consumedMessage instanceof HookNotification.EntityUpdateRequest); + assertTrue(consumedMessage instanceof EntityUpdateRequest); - HookNotification.EntityUpdateRequest deserializedEntityUpdateRequest = - (HookNotification.EntityUpdateRequest) consumedMessage; + EntityUpdateRequest deserializedEntityUpdateRequest = (EntityUpdateRequest) consumedMessage; Referenceable deserializedEntity = deserializedEntityUpdateRequest.getEntities().get(0); + assertEquals(deserializedEntity.getId(), entity.getId()); assertEquals(deserializedEntity.getTypeName(), entity.getTypeName()); assertEquals(deserializedEntity.getTraits(), entity.getTraits()); http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java index 09e2e43..78d2a90 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.testng.annotations.Test; +import java.util.Arrays; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -32,7 +33,6 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import scala.actors.threadpool.Arrays; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java index a1e13b9..e0655f3 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java @@ -19,29 +19,28 @@ package org.apache.atlas.kafka; import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.notification.hook.HookNotification; -import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.RandomStringUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import org.apache.atlas.model.notification.HookNotification; import java.util.List; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; public class KafkaNotificationTest { - private KafkaNotification kafkaNotification; @BeforeClass public void setup() throws Exception { Configuration properties = ApplicationProperties.get(); + properties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5)); kafkaNotification = new KafkaNotification(properties); @@ -56,29 +55,27 @@ public class KafkaNotificationTest { @Test public void testReceiveKafkaMessages() throws Exception { - kafkaNotification.send(NotificationInterface.NotificationType.HOOK, - new HookNotification.EntityCreateRequest("u1", new Referenceable("type"))); - kafkaNotification.send(NotificationInterface.NotificationType.HOOK, - new HookNotification.EntityCreateRequest("u2", new Referenceable("type"))); - kafkaNotification.send(NotificationInterface.NotificationType.HOOK, - new HookNotification.EntityCreateRequest("u3", new Referenceable("type"))); - kafkaNotification.send(NotificationInterface.NotificationType.HOOK, - new HookNotification.EntityCreateRequest("u4", new Referenceable("type"))); - - NotificationConsumer<Object> consumer = - kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0); - List<AtlasKafkaMessage<Object>> messages = null ; - long startTime = System.currentTimeMillis(); //fetch starting time + kafkaNotification.send(NotificationInterface.NotificationType.HOOK, new EntityCreateRequest("u1", new Referenceable("type"))); + kafkaNotification.send(NotificationInterface.NotificationType.HOOK, new EntityCreateRequest("u2", new Referenceable("type"))); + kafkaNotification.send(NotificationInterface.NotificationType.HOOK, new EntityCreateRequest("u3", new Referenceable("type"))); + kafkaNotification.send(NotificationInterface.NotificationType.HOOK, new EntityCreateRequest("u4", new Referenceable("type"))); + + NotificationConsumer<Object> consumer = kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1).get(0); + List<AtlasKafkaMessage<Object>> messages = null ; + long startTime = System.currentTimeMillis(); //fetch starting time + while ((System.currentTimeMillis() - startTime) < 10000) { messages = consumer.receive(); + if (messages.size() > 0) { break; } } - int i=1; + int i = 1; for (AtlasKafkaMessage<Object> msg : messages){ - HookNotification.HookNotificationMessage message = (HookNotificationMessage) msg.getMessage(); + HookNotification message = (HookNotification) msg.getMessage(); + assertEquals(message.getUser(), "u"+i++); }