http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java index 988d98a..8bc7cb4 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java @@ -18,17 +18,19 @@ package org.apache.atlas.notification; import com.google.common.annotations.VisibleForTesting; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; import com.google.gson.JsonParser; import com.google.gson.JsonSerializationContext; import com.google.gson.JsonSerializer; import org.apache.atlas.AtlasException; import org.apache.atlas.ha.HAConfiguration; +import org.apache.atlas.model.notification.AtlasNotificationBaseMessage; +import org.apache.atlas.model.notification.AtlasNotificationMessage; +import org.apache.atlas.model.notification.AtlasNotificationStringMessage; import org.apache.atlas.v1.model.instance.Referenceable; -import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind; +import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind; import org.apache.atlas.type.AtlasType; +import org.apache.atlas.model.notification.MessageVersion; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.codehaus.jettison.json.JSONArray; @@ -44,8 +46,8 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_COMPRESSION_ENABLED; -import static org.apache.atlas.notification.AtlasNotificationBaseMessage.MESSAGE_MAX_LENGTH_BYTES; +import static org.apache.atlas.model.notification.AtlasNotificationBaseMessage.MESSAGE_COMPRESSION_ENABLED; +import static org.apache.atlas.model.notification.AtlasNotificationBaseMessage.MESSAGE_MAX_LENGTH_BYTES; /** * Abstract notification interface implementation. @@ -78,14 +80,6 @@ public abstract class AbstractNotification implements NotificationInterface { private final boolean embedded; private final boolean isHAEnabled; - /** - * Used for message serialization. - */ - public static final Gson GSON = new GsonBuilder(). - registerTypeAdapter(Referenceable.class, new ReferenceableSerializer()). - registerTypeAdapter(JSONArray.class, new JSONArraySerializer()). - create(); - // ----- Constructors ---------------------------------------------------- public AbstractNotification(Configuration applicationProperties) throws AtlasException { @@ -158,7 +152,7 @@ public abstract class AbstractNotification implements NotificationInterface { public static String getMessageJson(Object message) { AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message); - return GSON.toJson(notificationMsg); + return AtlasType.toV1Json(notificationMsg); } private static String getHostAddress() { @@ -188,7 +182,7 @@ public abstract class AbstractNotification implements NotificationInterface { */ public static void createNotificationMessages(Object message, List<String> msgJsonList) { AtlasNotificationMessage<?> notificationMsg = new AtlasNotificationMessage<>(CURRENT_MESSAGE_VERSION, message, getHostAddress(), getCurrentUser()); - String msgJson = GSON.toJson(notificationMsg); + String msgJson = AtlasType.toV1Json(notificationMsg); boolean msgLengthExceedsLimit = (msgJson.length() * MAX_BYTES_PER_CHAR) > MESSAGE_MAX_LENGTH_BYTES; @@ -213,7 +207,7 @@ public abstract class AbstractNotification implements NotificationInterface { if (!msgLengthExceedsLimit) { // no need to split AtlasNotificationStringMessage compressedMsg = new AtlasNotificationStringMessage(encodedBytes, msgId, compressionKind); - msgJson = GSON.toJson(compressedMsg); // msgJson will not have multi-byte characters here, due to use of encodeBase64() above + msgJson = AtlasType.toV1Json(compressedMsg); // msgJson will not have multi-byte characters here, due to use of encodeBase64() above msgBytes = null; // not used after this point } else { // encodedBytes will be split msgJson = null; // not used after this point @@ -239,7 +233,7 @@ public abstract class AbstractNotification implements NotificationInterface { AtlasNotificationStringMessage splitMsg = new AtlasNotificationStringMessage(encodedBytes, offset, length, msgId, compressionKind, i, splitCount); - String splitMsgJson = GSON.toJson(splitMsg); + String splitMsgJson = AtlasType.toV1Json(splitMsg); msgJsonList.add(splitMsgJson);
http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java index 8cf1e8e..c3940ce 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java @@ -16,30 +16,19 @@ * limitations under the License. */ package org.apache.atlas.notification; + import org.apache.kafka.common.TopicPartition; + /** * Abstract notification consumer. */ public abstract class AbstractNotificationConsumer<T> implements NotificationConsumer<T> { + protected final AtlasNotificationMessageDeserializer<T> deserializer; - /** - * Deserializer used to deserialize notification messages for this consumer. - */ - protected final MessageDeserializer<T> deserializer; - - - - /** - * Construct an AbstractNotificationConsumer. - * - * @param deserializer the message deserializer used by this consumer - */ - public AbstractNotificationConsumer(MessageDeserializer<T> deserializer) { + protected AbstractNotificationConsumer(AtlasNotificationMessageDeserializer<T> deserializer) { this.deserializer = deserializer; } - - public abstract void commit(TopicPartition partition, long offset); } http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java deleted file mode 100644 index 3b377de..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.notification; - - -import org.apache.atlas.AtlasConfiguration; -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.codec.binary.StringUtils; -import org.apache.commons.compress.utils.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.zip.GZIPInputStream; -import java.util.zip.GZIPOutputStream; - - -public class AtlasNotificationBaseMessage { - private static final Logger LOG = LoggerFactory.getLogger(AtlasNotificationBaseMessage.class); - - public static final int MESSAGE_MAX_LENGTH_BYTES = AtlasConfiguration.NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES.getInt() - 512; // 512 bytes for envelop; - public static final boolean MESSAGE_COMPRESSION_ENABLED = AtlasConfiguration.NOTIFICATION_MESSAGE_COMPRESSION_ENABLED.getBoolean(); - - public enum CompressionKind { NONE, GZIP }; - - private MessageVersion version = null; - private String msgId = null; - private CompressionKind msgCompressionKind = CompressionKind.NONE; - private int msgSplitIdx = 1; - private int msgSplitCount = 1; - - - public AtlasNotificationBaseMessage() { - } - - public AtlasNotificationBaseMessage(MessageVersion version) { - this(version, null, CompressionKind.NONE); - } - - public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind) { - this.version = version; - this.msgId = msgId; - this.msgCompressionKind = msgCompressionKind; - } - - public AtlasNotificationBaseMessage(MessageVersion version, String msgId, CompressionKind msgCompressionKind, int msgSplitIdx, int msgSplitCount) { - this.version = version; - this.msgId = msgId; - this.msgCompressionKind = msgCompressionKind; - this.msgSplitIdx = msgSplitIdx; - this.msgSplitCount = msgSplitCount; - } - - public void setVersion(MessageVersion version) { - this.version = version; - } - - public MessageVersion getVersion() { - return version; - } - - public String getMsgId() { - return msgId; - } - - public void setMsgId(String msgId) { - this.msgId = msgId; - } - - public CompressionKind getMsgCompressionKind() { - return msgCompressionKind; - } - - public void setMsgCompressed(CompressionKind msgCompressionKind) { - this.msgCompressionKind = msgCompressionKind; - } - - public int getMsgSplitIdx() { - return msgSplitIdx; - } - - public void setMsgSplitIdx(int msgSplitIdx) { - this.msgSplitIdx = msgSplitIdx; - } - - public int getMsgSplitCount() { - return msgSplitCount; - } - - public void setMsgSplitCount(int msgSplitCount) { - this.msgSplitCount = msgSplitCount; - } - - /** - * Compare the version of this message with the given version. - * - * @param compareToVersion the version to compare to - * - * @return a negative integer, zero, or a positive integer as this message's version is less than, equal to, - * or greater than the given version. - */ - public int compareVersion(MessageVersion compareToVersion) { - return version.compareTo(compareToVersion); - } - - - public static byte[] getBytesUtf8(String str) { - return StringUtils.getBytesUtf8(str); - } - - public static String getStringUtf8(byte[] bytes) { - return StringUtils.newStringUtf8(bytes); - } - - public static byte[] encodeBase64(byte[] bytes) { - return Base64.encodeBase64(bytes); - } - - public static byte[] decodeBase64(byte[] bytes) { - return Base64.decodeBase64(bytes); - } - - public static byte[] gzipCompressAndEncodeBase64(byte[] bytes) { - return encodeBase64(gzipCompress(bytes)); - } - - public static byte[] decodeBase64AndGzipUncompress(byte[] bytes) { - return gzipUncompress(decodeBase64(bytes)); - } - - public static String gzipCompress(String str) { - byte[] bytes = getBytesUtf8(str); - byte[] compressedBytes = gzipCompress(bytes); - byte[] encodedBytes = encodeBase64(compressedBytes); - - return getStringUtf8(encodedBytes); - } - - public static String gzipUncompress(String str) { - byte[] encodedBytes = getBytesUtf8(str); - byte[] compressedBytes = decodeBase64(encodedBytes); - byte[] bytes = gzipUncompress(compressedBytes); - - return getStringUtf8(bytes); - } - - public static byte[] gzipCompress(byte[] content) { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - - try { - GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream); - - gzipOutputStream.write(content); - gzipOutputStream.close(); - } catch (IOException e) { - LOG.error("gzipCompress(): error compressing {} bytes", content.length, e); - - throw new RuntimeException(e); - } - - return byteArrayOutputStream.toByteArray(); - } - - public static byte[] gzipUncompress(byte[] content) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - - try { - IOUtils.copy(new GZIPInputStream(new ByteArrayInputStream(content)), out); - } catch (IOException e) { - LOG.error("gzipUncompress(): error uncompressing {} bytes", content.length, e); - } - - return out.toByteArray(); - } -} - http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java deleted file mode 100644 index 63d93c9..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.notification; - -import org.joda.time.DateTimeZone; -import org.joda.time.Instant; - -/** - * Represents a notification message that is associated with a version. - */ -public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage { - private String msgSourceIP; - private String msgCreatedBy; - private long msgCreationTime; - - /** - * The actual message. - */ - private final T message; - - - // ----- Constructors ---------------------------------------------------- - - /** - * Create a notification message. - * - * @param version the message version - * @param message the actual message - */ - public AtlasNotificationMessage(MessageVersion version, T message) { - this(version, message, null, null); - } - - public AtlasNotificationMessage(MessageVersion version, T message, String msgSourceIP, String createdBy) { - super(version); - - this.msgSourceIP = msgSourceIP; - this.msgCreatedBy = createdBy; - this.msgCreationTime = Instant.now().toDateTime(DateTimeZone.UTC).getMillis(); - this.message = message; - } - - - public String getMsgSourceIP() { - return msgSourceIP; - } - - public void setMsgSourceIP(String msgSourceIP) { - this.msgSourceIP = msgSourceIP; - } - - public String getMsgCreatedBy() { - return msgCreatedBy; - } - - public void setMsgCreatedBy(String msgCreatedBy) { - this.msgCreatedBy = msgCreatedBy; - } - - public long getMsgCreationTime() { - return msgCreationTime; - } - - public void setMsgCreationTime(long msgCreationTime) { - this.msgCreationTime = msgCreationTime; - } - - public T getMessage() { - return message; - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java index 2a175ba..d6e6878 100644 --- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java @@ -19,14 +19,17 @@ package org.apache.atlas.notification; import com.google.common.annotations.VisibleForTesting; -import com.google.gson.Gson; -import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind; +import org.apache.atlas.model.notification.AtlasNotificationBaseMessage; +import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind; +import org.apache.atlas.model.notification.AtlasNotificationMessage; +import org.apache.atlas.model.notification.AtlasNotificationStringMessage; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.model.notification.MessageVersion; import org.apache.commons.lang3.StringUtils; +import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -47,11 +50,10 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message public static final String VERSION_MISMATCH_MSG = "Notification message version mismatch. Expected %s but recieved %s. Message %s"; - private final Type notificationMessageType; - private final Type messageType; - private final MessageVersion expectedVersion; - private final Logger notificationLogger; - private final Gson gson; + private final TypeReference<T> messageType; + private final TypeReference<AtlasNotificationMessage<T>> notificationMessageType; + private final MessageVersion expectedVersion; + private final Logger notificationLogger; private final Map<String, SplitMessageAggregator> splitMsgBuffer = new HashMap<>(); @@ -65,33 +67,40 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message /** * Create a notification message deserializer. * - * @param notificationMessageType the type of the notification message * @param expectedVersion the expected message version - * @param gson JSON serialization/deserialization * @param notificationLogger logger for message version mismatch */ - public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion, - Gson gson, Logger notificationLogger) { - this(notificationMessageType, expectedVersion, gson, notificationLogger, + public AtlasNotificationMessageDeserializer(TypeReference<T> messageType, + TypeReference<AtlasNotificationMessage<T>> notificationMessageType, + MessageVersion expectedVersion, Logger notificationLogger) { + this(messageType, notificationMessageType, expectedVersion, notificationLogger, NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS.getLong() * 1000, NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS.getLong() * 1000); } - public AtlasNotificationMessageDeserializer(Type notificationMessageType, MessageVersion expectedVersion, - Gson gson, Logger notificationLogger, + public AtlasNotificationMessageDeserializer(TypeReference<T> messageType, + TypeReference<AtlasNotificationMessage<T>> notificationMessageType, + MessageVersion expectedVersion, + Logger notificationLogger, long splitMessageSegmentsWaitTimeMs, long splitMessageBufferPurgeIntervalMs) { + this.messageType = messageType; this.notificationMessageType = notificationMessageType; - this.messageType = ((ParameterizedType) notificationMessageType).getActualTypeArguments()[0]; this.expectedVersion = expectedVersion; - this.gson = gson; this.notificationLogger = notificationLogger; this.splitMessageSegmentsWaitTimeMs = splitMessageSegmentsWaitTimeMs; this.splitMessageBufferPurgeIntervalMs = splitMessageBufferPurgeIntervalMs; } - // ----- MessageDeserializer --------------------------------------------- + public TypeReference<T> getMessageType() { + return messageType; + } + public TypeReference<AtlasNotificationMessage<T>> getNotificationMessageType() { + return notificationMessageType; + } + + // ----- MessageDeserializer --------------------------------------------- @Override public T deserialize(String messageJson) { final T ret; @@ -99,15 +108,15 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message messageCountTotal.incrementAndGet(); messageCountSinceLastInterval.incrementAndGet(); - AtlasNotificationBaseMessage msg = gson.fromJson(messageJson, AtlasNotificationBaseMessage.class); + AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, AtlasNotificationBaseMessage.class); if (msg.getVersion() == null) { // older style messages not wrapped with AtlasNotificationMessage - ret = gson.fromJson(messageJson, messageType); + ret = AtlasType.fromV1Json(messageJson, messageType); } else { String msgJson = messageJson; if (msg.getMsgSplitCount() > 1) { // multi-part message - AtlasNotificationStringMessage splitMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class); + AtlasNotificationStringMessage splitMsg = AtlasType.fromV1Json(msgJson, AtlasNotificationStringMessage.class); checkVersion(splitMsg, msgJson); @@ -184,7 +193,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message LOG.info("Received msgID={}: splitCount={}, length={} bytes", msgId, splitCount, bytes.length); } - msg = gson.fromJson(msgJson, AtlasNotificationBaseMessage.class); + msg = AtlasType.fromV1Json(msgJson, AtlasNotificationBaseMessage.class); } else { msg = null; } @@ -197,7 +206,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message if (msg != null) { if (CompressionKind.GZIP.equals(msg.getMsgCompressionKind())) { - AtlasNotificationStringMessage compressedMsg = gson.fromJson(msgJson, AtlasNotificationStringMessage.class); + AtlasNotificationStringMessage compressedMsg = AtlasType.fromV1Json(msgJson, AtlasNotificationStringMessage.class); byte[] encodedBytes = AtlasNotificationBaseMessage.getBytesUtf8(compressedMsg.getMessage()); byte[] bytes = AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes); @@ -207,7 +216,7 @@ public abstract class AtlasNotificationMessageDeserializer<T> implements Message LOG.info("Received msgID={}: compressed={} bytes, uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, bytes.length); } - AtlasNotificationMessage<T> atlasNotificationMessage = gson.fromJson(msgJson, notificationMessageType); + AtlasNotificationMessage<T> atlasNotificationMessage = AtlasType.fromV1Json(msgJson, notificationMessageType); checkVersion(atlasNotificationMessage, msgJson); http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java deleted file mode 100644 index 41485a0..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.notification; - - -public class AtlasNotificationStringMessage extends AtlasNotificationBaseMessage { - private String message = null; - - public AtlasNotificationStringMessage() { - super(AbstractNotification.CURRENT_MESSAGE_VERSION); - } - - public AtlasNotificationStringMessage(String message) { - super(AbstractNotification.CURRENT_MESSAGE_VERSION); - - this.message = message; - } - - public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind) { - super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind); - - this.message = message; - } - - public AtlasNotificationStringMessage(String message, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) { - super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount); - - this.message = message; - } - - public AtlasNotificationStringMessage(byte[] encodedBytes, String msgId, CompressionKind compressionKind) { - super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind); - - this.message = AtlasNotificationBaseMessage.getStringUtf8(encodedBytes); - } - - public AtlasNotificationStringMessage(byte[] encodedBytes, int offset, int length, String msgId, CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) { - super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, compressionKind, msgSplitIdx, msgSplitCount); - - this.message = new String(encodedBytes, offset, length); - } - - public void setMessage(String message) { - this.message = message; - } - - public String getMessage() { - return message; - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java deleted file mode 100644 index 7f96638..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.notification; - -import java.util.ArrayList; -import java.util.Arrays; - -/** - * Represents the version of a notification message. - */ -public class MessageVersion implements Comparable<MessageVersion> { - /** - * Used for message with no version (old format). - */ - public static final MessageVersion NO_VERSION = new MessageVersion("0"); - public static final MessageVersion VERSION_1 = new MessageVersion("1.0.0"); - - public static final MessageVersion CURRENT_VERSION = VERSION_1; - - private final String version; - - - // ----- Constructors ---------------------------------------------------- - - /** - * Create a message version. - * - * @param version the version string - */ - public MessageVersion(String version) { - this.version = version; - - try { - getVersionParts(); - } catch (NumberFormatException e) { - throw new IllegalArgumentException(String.format("Invalid version string : %s.", version), e); - } - } - - - // ----- Comparable ------------------------------------------------------ - - @Override - public int compareTo(MessageVersion that) { - if (that == null) { - return 1; - } - - Integer[] thisParts = getVersionParts(); - Integer[] thatParts = that.getVersionParts(); - - int length = Math.max(thisParts.length, thatParts.length); - - for (int i = 0; i < length; i++) { - - int comp = getVersionPart(thisParts, i) - getVersionPart(thatParts, i); - - if (comp != 0) { - return comp; - } - } - return 0; - } - - - // ----- Object overrides ------------------------------------------------ - - @Override - public boolean equals(Object that) { - if (this == that){ - return true; - } - - if (that == null || getClass() != that.getClass()) { - return false; - } - - return compareTo((MessageVersion) that) == 0; - } - - @Override - public int hashCode() { - return Arrays.hashCode(getVersionParts()); - } - - - @Override - public String toString() { - return "MessageVersion[version=" + version + "]"; - } - - // ----- helper methods -------------------------------------------------- - - /** - * Get the version parts array by splitting the version string. - * Strip the trailing zeros (i.e. '1.0.0' equals '1'). - * - * @return the version parts array - */ - protected Integer[] getVersionParts() { - - String[] sParts = version.split("\\."); - ArrayList<Integer> iParts = new ArrayList<>(); - int trailingZeros = 0; - - for (String sPart : sParts) { - Integer iPart = new Integer(sPart); - - if (iPart == 0) { - ++trailingZeros; - } else { - for (int i = 0; i < trailingZeros; ++i) { - iParts.add(0); - } - trailingZeros = 0; - iParts.add(iPart); - } - } - return iParts.toArray(new Integer[iParts.size()]); - } - - private Integer getVersionPart(Integer[] versionParts, int i) { - return i < versionParts.length ? versionParts[i] : 0; - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java index 8809225..975967d 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java @@ -18,10 +18,13 @@ package org.apache.atlas.notification; import com.google.gson.reflect.TypeToken; +import org.apache.atlas.model.notification.AtlasNotificationMessage; import org.apache.atlas.notification.entity.EntityMessageDeserializer; -import org.apache.atlas.notification.entity.EntityNotification; import org.apache.atlas.notification.hook.HookMessageDeserializer; -import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.v1.model.notification.EntityNotification; +import org.apache.atlas.v1.model.notification.HookNotification; +import org.codehaus.jackson.type.TypeReference; +import scala.reflect.internal.Types; import java.lang.reflect.Type; import java.util.List; @@ -53,47 +56,25 @@ public interface NotificationInterface { /** * Versioned notification message class types. */ - Type HOOK_VERSIONED_MESSAGE_TYPE = - new TypeToken<AtlasNotificationMessage<HookNotification.HookNotificationMessage>>(){}.getType(); - Type ENTITY_VERSIONED_MESSAGE_TYPE = new TypeToken<AtlasNotificationMessage<EntityNotification>>(){}.getType(); /** * Atlas notification types. */ enum NotificationType { - // Notifications from the Atlas integration hooks. - HOOK(HOOK_NOTIFICATION_CLASS, new HookMessageDeserializer()), + HOOK(new HookMessageDeserializer()), // Notifications to entity change consumers. - ENTITIES(ENTITY_NOTIFICATION_CLASS, new EntityMessageDeserializer()); - - - /** - * The notification class associated with this type. - */ - private final Class classType; - - /** - * The message deserializer for this type. - */ - private final MessageDeserializer deserializer; + ENTITIES(new EntityMessageDeserializer()); + private final AtlasNotificationMessageDeserializer deserializer; - NotificationType(Class classType, MessageDeserializer<?> deserializer) { - this.classType = classType; + NotificationType(AtlasNotificationMessageDeserializer deserializer) { this.deserializer = deserializer; } - - // ----- accessors --------------------------------------------------- - - public Class getClassType() { - return classType; - } - - public MessageDeserializer getDeserializer() { + public AtlasNotificationMessageDeserializer getDeserializer() { return deserializer; } } http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java index 148b57f..10df121 100644 --- a/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java +++ b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java @@ -18,6 +18,8 @@ package org.apache.atlas.notification; +import org.apache.atlas.model.notification.AtlasNotificationStringMessage; + public class SplitMessageAggregator { private final String msgId; private final AtlasNotificationStringMessage[] splitMessagesBuffer; http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java index a6f7e64..526aa93 100644 --- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java @@ -18,19 +18,14 @@ package org.apache.atlas.notification.entity; -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; +import org.apache.atlas.model.notification.AtlasNotificationMessage; import org.apache.atlas.notification.AbstractMessageDeserializer; import org.apache.atlas.notification.AbstractNotification; -import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.v1.model.notification.EntityNotification; +import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Type; -import java.util.Collections; -import java.util.Map; - /** * Entity notification message deserializer. */ @@ -48,29 +43,8 @@ public class EntityMessageDeserializer extends AbstractMessageDeserializer<Entit * Create an entity notification message deserializer. */ public EntityMessageDeserializer() { - super(NotificationInterface.ENTITY_VERSIONED_MESSAGE_TYPE, - AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER); - } - - - // ----- helper methods -------------------------------------------------- - - private static Map<Type, JsonDeserializer> getDeserializerMap() { - return Collections.<Type, JsonDeserializer>singletonMap( - NotificationInterface.ENTITY_NOTIFICATION_CLASS, new EntityNotificationDeserializer()); - } - - - // ----- deserializer classes -------------------------------------------- - - /** - * Deserializer for EntityNotification. - */ - protected static final class EntityNotificationDeserializer implements JsonDeserializer<EntityNotification> { - @Override - public EntityNotification deserialize(final JsonElement json, final Type type, - final JsonDeserializationContext context) { - return context.deserialize(json, EntityNotificationImpl.class); - } + super(new TypeReference<EntityNotification>() {}, + new TypeReference<AtlasNotificationMessage<EntityNotification>>() {}, + AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER); } } http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java deleted file mode 100644 index 96e2e2f..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.notification.entity; - - -import org.apache.atlas.v1.model.instance.Referenceable; -import org.apache.atlas.v1.model.instance.Struct; - -import java.util.List; - -/** - * Notification of entity changes. - */ -public interface EntityNotification { - - /** - * Operations that result in an entity notification. - */ - enum OperationType { - ENTITY_CREATE, - ENTITY_UPDATE, - ENTITY_DELETE, - TRAIT_ADD, - TRAIT_DELETE, - TRAIT_UPDATE - } - - - // ----- EntityNotification ------------------------------------------------ - - /** - * Get the entity that is associated with this notification. - * - * @return the associated entity - */ - Referenceable getEntity(); - - /** - * Get flattened list of traits that are associated with this entity (includes super traits). - * - * @return the list of all traits - */ - List<Struct> getAllTraits(); - - /** - * Get the type of operation that triggered this notification. - * - * @return the operation type - */ - OperationType getOperationType(); -} http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java b/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java deleted file mode 100644 index ab8e4c8..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java +++ /dev/null @@ -1,157 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.notification.entity; - -import org.apache.atlas.AtlasException; -import org.apache.atlas.v1.model.instance.Referenceable; -import org.apache.atlas.v1.model.instance.Struct; -import org.apache.atlas.type.AtlasClassificationType; -import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.collections.MapUtils; - -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; - -/** - * Entity notification implementation. - */ -public class EntityNotificationImpl implements EntityNotification { - - private final Referenceable entity; - private final OperationType operationType; - private final List<Struct> traits; - - - // ----- Constructors ------------------------------------------------------ - - /** - * No-arg constructor for serialization. - */ - @SuppressWarnings("unused") - private EntityNotificationImpl() throws AtlasException { - this(null, OperationType.ENTITY_CREATE, Collections.<Struct>emptyList()); - } - - /** - * Construct an EntityNotification. - * - * @param entity the entity subject of the notification - * @param operationType the type of operation that caused the notification - * @param traits the traits for the given entity - * - * @throws AtlasException if the entity notification can not be created - */ - public EntityNotificationImpl(Referenceable entity, OperationType operationType, List<Struct> traits) - throws AtlasException { - this.entity = entity; - this.operationType = operationType; - this.traits = traits; - } - - /** - * Construct an EntityNotification. - * - * @param entity the entity subject of the notification - * @param operationType the type of operation that caused the notification - * @param typeRegistry the Atlas type system - * - * @throws AtlasException if the entity notification can not be created - */ - public EntityNotificationImpl(Referenceable entity, OperationType operationType, AtlasTypeRegistry typeRegistry) - throws AtlasException { - this(entity, operationType, getAllTraits(entity, typeRegistry)); - } - - - // ----- EntityNotification ------------------------------------------------ - - @Override - public Referenceable getEntity() { - return entity; - } - - @Override - public List<Struct> getAllTraits() { - return traits; - } - - @Override - public OperationType getOperationType() { - return operationType; - } - - - // ----- Object overrides -------------------------------------------------- - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - EntityNotificationImpl that = (EntityNotificationImpl) o; - return Objects.equals(entity, that.entity) && - operationType == that.operationType && - Objects.equals(traits, that.traits); - } - - @Override - public int hashCode() { - return Objects.hash(entity, operationType, traits); - } - - - // ----- helper methods ---------------------------------------------------- - - private static List<Struct> getAllTraits(Referenceable entityDefinition, AtlasTypeRegistry typeRegistry) throws AtlasException { - List<Struct> ret = new LinkedList<>(); - - for (String traitName : entityDefinition.getTraitNames()) { - Struct trait = entityDefinition.getTrait(traitName); - AtlasClassificationType traitType = typeRegistry.getClassificationTypeByName(traitName); - Set<String> superTypeNames = traitType != null ? traitType.getAllSuperTypes() : null; - - ret.add(trait); - - if (CollectionUtils.isNotEmpty(superTypeNames)) { - for (String superTypeName : superTypeNames) { - Struct superTypeTrait = new Struct(superTypeName); - - if (MapUtils.isNotEmpty(trait.getValues())) { - AtlasClassificationType superType = typeRegistry.getClassificationTypeByName(superTypeName); - - if (superType != null && MapUtils.isNotEmpty(superType.getAllAttributes())) { - Map<String, Object> attributes = new HashMap<>(); - - // TODO: add superTypeTrait attributess - - superTypeTrait.setValues(attributes); - } - } - - ret.add(superTypeTrait); - } - } - } - - return ret; - }} http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java index 8337de0..1b337d4 100644 --- a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java +++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java @@ -18,16 +18,15 @@ package org.apache.atlas.notification.hook; -import com.google.gson.JsonDeserializer; +import org.apache.atlas.model.notification.AtlasNotificationMessage; import org.apache.atlas.notification.AbstractMessageDeserializer; import org.apache.atlas.notification.AbstractNotification; -import org.apache.atlas.notification.NotificationInterface; +import org.apache.atlas.v1.model.notification.HookNotification; +import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Type; -import java.util.Collections; -import java.util.Map; + /** * Hook notification message deserializer. @@ -46,15 +45,11 @@ public class HookMessageDeserializer extends AbstractMessageDeserializer<HookNot * Create a hook notification message deserializer. */ public HookMessageDeserializer() { - super(NotificationInterface.HOOK_VERSIONED_MESSAGE_TYPE, - AbstractNotification.CURRENT_MESSAGE_VERSION, getDeserializerMap(), NOTIFICATION_LOGGER); + super(new TypeReference<HookNotification.HookNotificationMessage>() {}, + new TypeReference<AtlasNotificationMessage<HookNotification.HookNotificationMessage>>() {}, + AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER); } // ----- helper methods -------------------------------------------------- - - private static Map<Type, JsonDeserializer> getDeserializerMap() { - return Collections.<Type, JsonDeserializer>singletonMap( - NotificationInterface.HOOK_NOTIFICATION_CLASS, new HookNotification()); - } } http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java deleted file mode 100644 index ca596ea..0000000 --- a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java +++ /dev/null @@ -1,275 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.notification.hook; - -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import org.apache.atlas.v1.model.instance.Referenceable; -import org.apache.atlas.v1.model.typedef.TypesDef; -import org.apache.atlas.type.AtlasType; -import org.apache.commons.lang.StringUtils; -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONException; - -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * Contains the structure of messages transferred from hooks to atlas. - */ -public class HookNotification implements JsonDeserializer<HookNotification.HookNotificationMessage> { - - @Override - public HookNotificationMessage deserialize(JsonElement json, Type typeOfT, - JsonDeserializationContext context) { - HookNotificationType type = - context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class); - switch (type) { - case ENTITY_CREATE: - return context.deserialize(json, EntityCreateRequest.class); - - case ENTITY_FULL_UPDATE: - return context.deserialize(json, EntityUpdateRequest.class); - - case ENTITY_PARTIAL_UPDATE: - return context.deserialize(json, EntityPartialUpdateRequest.class); - - case ENTITY_DELETE: - return context.deserialize(json, EntityDeleteRequest.class); - - case TYPE_CREATE: - case TYPE_UPDATE: - return context.deserialize(json, TypeRequest.class); - - default: - throw new IllegalStateException("Unhandled type " + type); - } - } - - /** - * Type of the hook message. - */ - public enum HookNotificationType { - TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, ENTITY_FULL_UPDATE, ENTITY_DELETE - } - - /** - * Base type of hook message. - */ - public static class HookNotificationMessage { - public static final String UNKNOW_USER = "UNKNOWN"; - protected HookNotificationType type; - protected String user; - - private HookNotificationMessage() { - } - - public HookNotificationMessage(HookNotificationType type, String user) { - this.type = type; - this.user = user; - } - - public HookNotificationType getType() { - return type; - } - - public String getUser() { - if (StringUtils.isEmpty(user)) { - return UNKNOW_USER; - } - return user; - } - - - } - - /** - * Hook message for create type definitions. - */ - public static class TypeRequest extends HookNotificationMessage { - private TypesDef typesDef; - - private TypeRequest() { - } - - public TypeRequest(HookNotificationType type, TypesDef typesDef, String user) { - super(type, user); - this.typesDef = typesDef; - } - - public TypesDef getTypesDef() { - return typesDef; - } - } - - /** - * Hook message for creating new entities. - */ - public static class EntityCreateRequest extends HookNotificationMessage { - private List<Referenceable> entities; - - private EntityCreateRequest() { - } - - public EntityCreateRequest(String user, Referenceable... entities) { - this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities), user); - } - - public EntityCreateRequest(String user, List<Referenceable> entities) { - this(HookNotificationType.ENTITY_CREATE, entities, user); - } - - protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities, String user) { - super(type, user); - this.entities = entities; - } - - public EntityCreateRequest(String user, JSONArray jsonArray) { - super(HookNotificationType.ENTITY_CREATE, user); - entities = new ArrayList<>(); - for (int index = 0; index < jsonArray.length(); index++) { - try { - entities.add(AtlasType.fromV1Json(jsonArray.getString(index), Referenceable.class)); - } catch (JSONException e) { - throw new JsonParseException(e); - } - } - } - - public List<Referenceable> getEntities() { - return entities; - } - - @Override - public String toString() { - return entities.toString(); - } - } - - /** - * Hook message for updating entities(full update). - */ - public static class EntityUpdateRequest extends EntityCreateRequest { - public EntityUpdateRequest(String user, Referenceable... entities) { - this(user, Arrays.asList(entities)); - } - - public EntityUpdateRequest(String user, List<Referenceable> entities) { - super(HookNotificationType.ENTITY_FULL_UPDATE, entities, user); - } - } - - /** - * Hook message for updating entities(partial update). - */ - public static class EntityPartialUpdateRequest extends HookNotificationMessage { - private String typeName; - private String attribute; - private Referenceable entity; - private String attributeValue; - - private EntityPartialUpdateRequest() { - } - - public EntityPartialUpdateRequest(String user, String typeName, String attribute, String attributeValue, - Referenceable entity) { - super(HookNotificationType.ENTITY_PARTIAL_UPDATE, user); - this.typeName = typeName; - this.attribute = attribute; - this.attributeValue = attributeValue; - this.entity = entity; - } - - public String getTypeName() { - return typeName; - } - - public String getAttribute() { - return attribute; - } - - public Referenceable getEntity() { - return entity; - } - - public String getAttributeValue() { - return attributeValue; - } - - @Override - public String toString() { - return "{" - + "entityType='" + typeName + '\'' - + ", attribute=" + attribute - + ", value=" + attributeValue - + ", entity=" + entity - + '}'; - } - } - - /** - * Hook message for creating new entities. - */ - public static class EntityDeleteRequest extends HookNotificationMessage { - - private String typeName; - private String attribute; - private String attributeValue; - - private EntityDeleteRequest() { - } - - public EntityDeleteRequest(String user, String typeName, String attribute, String attributeValue) { - this(HookNotificationType.ENTITY_DELETE, user, typeName, attribute, attributeValue); - } - - protected EntityDeleteRequest(HookNotificationType type, - String user, String typeName, String attribute, String attributeValue) { - super(type, user); - this.typeName = typeName; - this.attribute = attribute; - this.attributeValue = attributeValue; - } - - public String getTypeName() { - return typeName; - } - - public String getAttribute() { - return attribute; - } - - public String getAttributeValue() { - return attributeValue; - } - - @Override - public String toString() { - return "{" - + "entityType='" + typeName + '\'' - + ", attribute=" + attribute - + ", value=" + attributeValue - + '}'; - } - } -} http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java index d59cb1c..9ce2a50 100644 --- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java +++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java @@ -20,7 +20,7 @@ package org.apache.atlas.hook; import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.v1.model.notification.HookNotification; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java index 071a725..f1fc741 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java @@ -22,10 +22,11 @@ import kafka.message.MessageAndMetadata; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; import org.apache.atlas.notification.*; -import org.apache.atlas.notification.AtlasNotificationMessage; -import org.apache.atlas.notification.entity.EntityNotificationImplTest; -import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.model.notification.AtlasNotificationMessage; +import org.apache.atlas.notification.entity.EntityNotificationTest; +import org.apache.atlas.v1.model.notification.HookNotification; import org.apache.atlas.type.AtlasType; +import org.apache.atlas.model.notification.MessageVersion; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -96,7 +97,7 @@ public class KafkaConsumerTest { when(messageAndMetadata.message()).thenReturn(json); - AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L); + AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, false, 100L); List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(); assertTrue(messageList.size() > 0); @@ -131,7 +132,7 @@ public class KafkaConsumerTest { when(kafkaConsumer.poll(100L)).thenReturn(records); when(messageAndMetadata.message()).thenReturn(json); - AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false, 100L); + AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer ,false, 100L); try { List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(); assertTrue(messageList.size() > 0); @@ -151,7 +152,7 @@ public class KafkaConsumerTest { TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L); + AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, false, 100L); consumer.commit(tp, 1); @@ -163,7 +164,7 @@ public class KafkaConsumerTest { TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true , 100L); + AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK, kafkaConsumer, true , 100L); consumer.commit(tp, 1); @@ -171,7 +172,7 @@ public class KafkaConsumerTest { } private Referenceable getEntity(String traitName) { - Referenceable entity = EntityNotificationImplTest.getEntity("id"); + Referenceable entity = EntityNotificationTest.getEntity("id"); List<Struct> traitInfo = new LinkedList<>(); Struct trait = new Struct(traitName, Collections.<String, Object>emptyMap()); traitInfo.add(trait); http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java index 5e3cf41..fe019e1 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java @@ -22,13 +22,13 @@ import org.apache.atlas.ApplicationProperties; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.notification.NotificationConsumer; import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.v1.model.notification.HookNotification; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.RandomStringUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage; +import static org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage; import java.util.List; http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java index f313ddc..caa72ce 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java @@ -18,13 +18,15 @@ package org.apache.atlas.notification; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import org.apache.atlas.kafka.AtlasKafkaMessage; +import org.apache.atlas.model.notification.AtlasNotificationMessage; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.model.notification.MessageVersion; +import org.codehaus.jackson.type.TypeReference; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.annotations.Test; -import java.lang.reflect.Type; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -41,8 +43,6 @@ import org.apache.kafka.common.TopicPartition; */ public class AbstractNotificationConsumerTest { - private static final Gson GSON = new Gson(); - @Test public void testReceive() throws Exception { Logger logger = mock(Logger.class); @@ -54,27 +54,24 @@ public class AbstractNotificationConsumerTest { List jsonList = new LinkedList<>(); - jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1))); - jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage2))); - jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage3))); - jsonList.add(GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage4))); - - Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType(); + jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1))); + jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage2))); + jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage3))); + jsonList.add(AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage4))); - NotificationConsumer<TestMessage> consumer = - new TestNotificationConsumer<>(notificationMessageType, jsonList, logger); + NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer(jsonList, logger); List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(); assertFalse(messageList.isEmpty()); - assertEquals(testMessage1, messageList.get(0).getMessage()); + assertEquals(messageList.get(0).getMessage(), testMessage1); - assertEquals(testMessage2, messageList.get(1).getMessage()); + assertEquals(messageList.get(1).getMessage(), testMessage2); - assertEquals(testMessage3, messageList.get(2).getMessage()); + assertEquals(messageList.get(2).getMessage(), testMessage3); - assertEquals(testMessage4, messageList.get(3).getMessage()); + assertEquals(messageList.get(3).getMessage(), testMessage4); } @Test @@ -88,20 +85,17 @@ public class AbstractNotificationConsumerTest { List jsonList = new LinkedList<>(); - String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)); - String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.0.5"), testMessage2)); - String json3 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("0.5.0"), testMessage3)); - String json4 = GSON.toJson(testMessage4); + String json1 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)); + String json2 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("0.0.5"), testMessage2)); + String json3 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("0.5.0"), testMessage3)); + String json4 = AtlasType.toV1Json(testMessage4); jsonList.add(json1); jsonList.add(json2); jsonList.add(json3); jsonList.add(json4); - Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType(); - - NotificationConsumer<TestMessage> consumer = - new TestNotificationConsumer<>(notificationMessageType, jsonList, logger); + NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer(jsonList, logger); List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(); @@ -124,16 +118,13 @@ public class AbstractNotificationConsumerTest { List jsonList = new LinkedList<>(); - String json1 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)); - String json2 = GSON.toJson(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), testMessage2)); + String json1 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), testMessage1)); + String json2 = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("2.0.0"), testMessage2)); jsonList.add(json1); jsonList.add(json2); - Type notificationMessageType = new TypeToken<AtlasNotificationMessage<TestMessage>>(){}.getType(); - - NotificationConsumer<TestMessage> consumer = - new TestNotificationConsumer<>(notificationMessageType, jsonList, logger); + NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer(jsonList, logger); try { List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(); @@ -150,7 +141,10 @@ public class AbstractNotificationConsumerTest { private static class TestMessage { private String s; - private int i; + private int i; + + public TestMessage() { + } public TestMessage(String s, int i) { this.s = s; @@ -165,6 +159,14 @@ public class AbstractNotificationConsumerTest { this.s = s; } + public int getI() { + return i; + } + + public void setI(int i) { + this.i = i; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -180,12 +182,14 @@ public class AbstractNotificationConsumerTest { } } - private static class TestNotificationConsumer<T> extends AbstractNotificationConsumer<T> { - private final List<T> messageList; - private int index = 0; + private static class TestNotificationConsumer extends AbstractNotificationConsumer<TestMessage> { + private final List<TestMessage> messageList; + private int index = 0; + + + public TestNotificationConsumer(List<TestMessage> messages, Logger logger) { + super(new TestMessageDeserializer()); - public TestNotificationConsumer(Type notificationMessageType, List<T> messages, Logger logger) { - super(new TestDeserializer<T>(notificationMessageType, logger)); this.messageList = messages; } @@ -205,24 +209,35 @@ public class AbstractNotificationConsumerTest { } @Override - public List<AtlasKafkaMessage<T>> receive() { + public List<AtlasKafkaMessage<TestMessage>> receive() { return receive(1000L); } @Override - public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) { - List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList(); + public List<AtlasKafkaMessage<TestMessage>> receive(long timeoutMilliSeconds) { + List<AtlasKafkaMessage<TestMessage>> tempMessageList = new ArrayList(); for(Object json : messageList) { - tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1)); + tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String) json), -1, -1)); } return tempMessageList; } } - private static final class TestDeserializer<T> extends AtlasNotificationMessageDeserializer<T> { + public static class TestMessageDeserializer extends AbstractMessageDeserializer<TestMessage> { + /** + * Logger for hook notification messages. + */ + private static final Logger NOTIFICATION_LOGGER = LoggerFactory.getLogger(TestMessageDeserializer.class); + + + // ----- Constructors ---------------------------------------------------- - private TestDeserializer(Type notificationMessageType, Logger logger) { - super(notificationMessageType, AbstractNotification.CURRENT_MESSAGE_VERSION, GSON, logger); + /** + * Create a hook notification message deserializer. + */ + public TestMessageDeserializer() { + super(new TypeReference<TestMessage>() {}, new TypeReference<AtlasNotificationMessage<TestMessage>>() {}, + AbstractNotification.CURRENT_MESSAGE_VERSION, NOTIFICATION_LOGGER); } } } http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java index 655252c..98d7d2c 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationTest.java @@ -19,7 +19,8 @@ package org.apache.atlas.notification; import org.apache.atlas.AtlasException; -import org.apache.atlas.notification.hook.HookNotification; +import org.apache.atlas.type.AtlasType; +import org.apache.atlas.v1.model.notification.HookNotification; import org.apache.commons.configuration.Configuration; import org.testng.annotations.Test; @@ -98,8 +99,8 @@ public class AbstractNotificationTest { // ignore msgCreationTime in Json private void assertEqualsMessageJson(String msgJsonActual, String msgJsonExpected) { - Map<Object, Object> msgActual = AbstractNotification.GSON.fromJson(msgJsonActual, Map.class); - Map<Object, Object> msgExpected = AbstractNotification.GSON.fromJson(msgJsonExpected, Map.class); + Map<Object, Object> msgActual = AtlasType.fromV1Json(msgJsonActual, Map.class); + Map<Object, Object> msgExpected = AtlasType.fromV1Json(msgJsonExpected, Map.class); msgActual.remove("msgCreationTime"); msgExpected.remove("msgCreationTime"); http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java index 27b5034..91a195d 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AtlasNotificationMessageTest.java @@ -18,6 +18,8 @@ package org.apache.atlas.notification; +import org.apache.atlas.model.notification.AtlasNotificationMessage; +import org.apache.atlas.model.notification.MessageVersion; import org.testng.annotations.Test; import static org.testng.Assert.*; http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java b/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java index d1af4b0..d8b3b34 100644 --- a/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/MessageVersionTest.java @@ -18,6 +18,7 @@ package org.apache.atlas.notification; +import org.apache.atlas.model.notification.MessageVersion; import org.testng.annotations.Test; import java.util.Arrays; http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java b/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java index 0807221..b79735a 100644 --- a/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/SplitMessageAggregatorTest.java @@ -17,7 +17,8 @@ */ package org.apache.atlas.notification; -import org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind; +import org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind; +import org.apache.atlas.model.notification.AtlasNotificationStringMessage; import org.testng.Assert; import org.testng.annotations.Test; http://git-wip-us.apache.org/repos/asf/atlas/blob/64e739da/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java index faafb87..ddb63b5 100644 --- a/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/entity/EntityMessageDeserializerTest.java @@ -21,6 +21,7 @@ package org.apache.atlas.notification.entity; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; import org.apache.atlas.notification.AbstractNotification; +import org.apache.atlas.v1.model.notification.EntityNotification; import org.testng.annotations.Test; import java.util.ArrayList; @@ -34,19 +35,18 @@ import static org.testng.Assert.assertEquals; * EntityMessageDeserializer tests. */ public class EntityMessageDeserializerTest { + private EntityMessageDeserializer deserializer = new EntityMessageDeserializer(); @Test public void testDeserialize() throws Exception { - EntityMessageDeserializer deserializer = new EntityMessageDeserializer(); - - Referenceable entity = EntityNotificationImplTest.getEntity("id"); + Referenceable entity = EntityNotificationTest.getEntity("id"); String traitName = "MyTrait"; List<Struct> traitInfo = new LinkedList<>(); Struct trait = new Struct(traitName, Collections.<String, Object>emptyMap()); traitInfo.add(trait); - EntityNotificationImpl notification = - new EntityNotificationImpl(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo); + EntityNotification notification = + new EntityNotification(entity, EntityNotification.OperationType.TRAIT_ADD, traitInfo); List<String> jsonMsgList = new ArrayList<>(); @@ -55,7 +55,7 @@ public class EntityMessageDeserializerTest { EntityNotification deserializedNotification = null; for (String jsonMsg : jsonMsgList) { - deserializedNotification = deserializer.deserialize(jsonMsg); + deserializedNotification = deserializer.deserialize(jsonMsg); if (deserializedNotification != null) { break;