http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
 
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
deleted file mode 100644
index 3b377de..0000000
--- 
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationBaseMessage.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-
-import org.apache.atlas.AtlasConfiguration;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.codec.binary.StringUtils;
-import org.apache.commons.compress.utils.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-
-public class AtlasNotificationBaseMessage {
-    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasNotificationBaseMessage.class);
-
-    public static final int     MESSAGE_MAX_LENGTH_BYTES    = 
AtlasConfiguration.NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES.getInt() - 512; // 512 
bytes for envelop;
-    public static final boolean MESSAGE_COMPRESSION_ENABLED = 
AtlasConfiguration.NOTIFICATION_MESSAGE_COMPRESSION_ENABLED.getBoolean();
-
-    public enum CompressionKind { NONE, GZIP };
-
-    private MessageVersion  version            = null;
-    private String          msgId              = null;
-    private CompressionKind msgCompressionKind = CompressionKind.NONE;
-    private int             msgSplitIdx        = 1;
-    private int             msgSplitCount      = 1;
-
-
-    public AtlasNotificationBaseMessage() {
-    }
-
-    public AtlasNotificationBaseMessage(MessageVersion version) {
-        this(version, null, CompressionKind.NONE);
-    }
-
-    public AtlasNotificationBaseMessage(MessageVersion version, String msgId, 
CompressionKind msgCompressionKind) {
-        this.version            = version;
-        this.msgId              = msgId;
-        this.msgCompressionKind = msgCompressionKind;
-    }
-
-    public AtlasNotificationBaseMessage(MessageVersion version, String msgId, 
CompressionKind msgCompressionKind, int msgSplitIdx, int msgSplitCount) {
-        this.version            = version;
-        this.msgId              = msgId;
-        this.msgCompressionKind = msgCompressionKind;
-        this.msgSplitIdx        = msgSplitIdx;
-        this.msgSplitCount      = msgSplitCount;
-    }
-
-    public void setVersion(MessageVersion version) {
-        this.version = version;
-    }
-
-    public MessageVersion getVersion() {
-        return version;
-    }
-
-    public String getMsgId() {
-        return msgId;
-    }
-
-    public void setMsgId(String msgId) {
-        this.msgId = msgId;
-    }
-
-    public CompressionKind getMsgCompressionKind() {
-        return msgCompressionKind;
-    }
-
-    public void setMsgCompressed(CompressionKind msgCompressionKind) {
-        this.msgCompressionKind = msgCompressionKind;
-    }
-
-    public int getMsgSplitIdx() {
-        return msgSplitIdx;
-    }
-
-    public void setMsgSplitIdx(int msgSplitIdx) {
-        this.msgSplitIdx = msgSplitIdx;
-    }
-
-    public int getMsgSplitCount() {
-        return msgSplitCount;
-    }
-
-    public void setMsgSplitCount(int msgSplitCount) {
-        this.msgSplitCount = msgSplitCount;
-    }
-
-    /**
-     * Compare the version of this message with the given version.
-     *
-     * @param compareToVersion  the version to compare to
-     *
-     * @return a negative integer, zero, or a positive integer as this 
message's version is less than, equal to,
-     *         or greater than the given version.
-     */
-    public int compareVersion(MessageVersion compareToVersion) {
-        return version.compareTo(compareToVersion);
-    }
-
-
-    public static byte[] getBytesUtf8(String str) {
-        return StringUtils.getBytesUtf8(str);
-    }
-
-    public static String getStringUtf8(byte[] bytes) {
-        return StringUtils.newStringUtf8(bytes);
-    }
-
-    public static byte[] encodeBase64(byte[] bytes) {
-        return Base64.encodeBase64(bytes);
-    }
-
-    public static byte[] decodeBase64(byte[] bytes) {
-        return Base64.decodeBase64(bytes);
-    }
-
-    public static byte[] gzipCompressAndEncodeBase64(byte[] bytes) {
-        return encodeBase64(gzipCompress(bytes));
-    }
-
-    public static byte[] decodeBase64AndGzipUncompress(byte[] bytes) {
-        return gzipUncompress(decodeBase64(bytes));
-    }
-
-    public static String gzipCompress(String str) {
-        byte[] bytes           = getBytesUtf8(str);
-        byte[] compressedBytes = gzipCompress(bytes);
-        byte[] encodedBytes    = encodeBase64(compressedBytes);
-
-        return getStringUtf8(encodedBytes);
-    }
-
-    public static String gzipUncompress(String str) {
-        byte[] encodedBytes    = getBytesUtf8(str);
-        byte[] compressedBytes = decodeBase64(encodedBytes);
-        byte[] bytes           = gzipUncompress(compressedBytes);
-
-        return getStringUtf8(bytes);
-    }
-
-    public static byte[] gzipCompress(byte[] content) {
-        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
-
-        try {
-            GZIPOutputStream gzipOutputStream = new 
GZIPOutputStream(byteArrayOutputStream);
-
-            gzipOutputStream.write(content);
-            gzipOutputStream.close();
-        } catch (IOException e) {
-            LOG.error("gzipCompress(): error compressing {} bytes", 
content.length, e);
-
-            throw new RuntimeException(e);
-        }
-
-        return byteArrayOutputStream.toByteArray();
-    }
-
-    public static byte[] gzipUncompress(byte[] content) {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-
-        try {
-            IOUtils.copy(new GZIPInputStream(new 
ByteArrayInputStream(content)), out);
-        } catch (IOException e) {
-            LOG.error("gzipUncompress(): error uncompressing {} bytes", 
content.length, e);
-        }
-
-        return out.toByteArray();
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
 
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
deleted file mode 100644
index 63d93c9..0000000
--- 
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessage.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-import org.joda.time.DateTimeZone;
-import org.joda.time.Instant;
-
-/**
- * Represents a notification message that is associated with a version.
- */
-public class AtlasNotificationMessage<T> extends AtlasNotificationBaseMessage {
-    private String msgSourceIP;
-    private String msgCreatedBy;
-    private long   msgCreationTime;
-
-    /**
-     * The actual message.
-     */
-    private final T message;
-
-
-    // ----- Constructors ----------------------------------------------------
-
-    /**
-     * Create a notification message.
-     *
-     * @param version  the message version
-     * @param message  the actual message
-     */
-    public AtlasNotificationMessage(MessageVersion version, T message) {
-        this(version, message, null, null);
-    }
-
-    public AtlasNotificationMessage(MessageVersion version, T message, String 
msgSourceIP, String createdBy) {
-        super(version);
-
-        this.msgSourceIP     = msgSourceIP;
-        this.msgCreatedBy    = createdBy;
-        this.msgCreationTime = 
Instant.now().toDateTime(DateTimeZone.UTC).getMillis();
-        this.message         = message;
-    }
-
-
-    public String getMsgSourceIP() {
-        return msgSourceIP;
-    }
-
-    public void setMsgSourceIP(String msgSourceIP) {
-        this.msgSourceIP = msgSourceIP;
-    }
-
-    public String getMsgCreatedBy() {
-        return msgCreatedBy;
-    }
-
-    public void setMsgCreatedBy(String msgCreatedBy) {
-        this.msgCreatedBy = msgCreatedBy;
-    }
-
-    public long getMsgCreationTime() {
-        return msgCreationTime;
-    }
-
-    public void setMsgCreationTime(long msgCreationTime) {
-        this.msgCreationTime = msgCreationTime;
-    }
-
-    public T getMessage() {
-        return message;
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
 
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
index 2a175ba..d6e6878 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationMessageDeserializer.java
@@ -19,14 +19,17 @@
 package org.apache.atlas.notification;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.gson.Gson;
-import 
org.apache.atlas.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.AtlasNotificationBaseMessage;
+import 
org.apache.atlas.model.notification.AtlasNotificationBaseMessage.CompressionKind;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.model.notification.MessageVersion;
 import org.apache.commons.lang3.StringUtils;
+import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -47,11 +50,10 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
     public static final String VERSION_MISMATCH_MSG =
         "Notification message version mismatch. Expected %s but recieved %s. 
Message %s";
 
-    private final Type notificationMessageType;
-    private final Type messageType;
-    private final MessageVersion expectedVersion;
-    private final Logger notificationLogger;
-    private final Gson gson;
+    private final TypeReference<T>                           messageType;
+    private final TypeReference<AtlasNotificationMessage<T>> 
notificationMessageType;
+    private final MessageVersion                             expectedVersion;
+    private final Logger                                     
notificationLogger;
 
 
     private final Map<String, SplitMessageAggregator> splitMsgBuffer = new 
HashMap<>();
@@ -65,33 +67,40 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
     /**
      * Create a notification message deserializer.
      *
-     * @param notificationMessageType the type of the notification message
      * @param expectedVersion         the expected message version
-     * @param gson                    JSON serialization/deserialization
      * @param notificationLogger      logger for message version mismatch
      */
-    public AtlasNotificationMessageDeserializer(Type notificationMessageType, 
MessageVersion expectedVersion,
-                                                Gson gson, Logger 
notificationLogger) {
-        this(notificationMessageType, expectedVersion, gson, 
notificationLogger,
+    public AtlasNotificationMessageDeserializer(TypeReference<T> messageType,
+                                                
TypeReference<AtlasNotificationMessage<T>> notificationMessageType,
+                                                MessageVersion 
expectedVersion, Logger notificationLogger) {
+        this(messageType, notificationMessageType, expectedVersion, 
notificationLogger,
              NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS.getLong() * 
1000,
              
NOTIFICATION_SPLIT_MESSAGE_BUFFER_PURGE_INTERVAL_SECONDS.getLong() * 1000);
     }
 
-    public AtlasNotificationMessageDeserializer(Type notificationMessageType, 
MessageVersion expectedVersion,
-                                                Gson gson, Logger 
notificationLogger,
+    public AtlasNotificationMessageDeserializer(TypeReference<T> messageType,
+                                                
TypeReference<AtlasNotificationMessage<T>> notificationMessageType,
+                                                MessageVersion expectedVersion,
+                                                Logger notificationLogger,
                                                 long 
splitMessageSegmentsWaitTimeMs,
                                                 long 
splitMessageBufferPurgeIntervalMs) {
+        this.messageType                       = messageType;
         this.notificationMessageType           = notificationMessageType;
-        this.messageType                       = ((ParameterizedType) 
notificationMessageType).getActualTypeArguments()[0];
         this.expectedVersion                   = expectedVersion;
-        this.gson                              = gson;
         this.notificationLogger                = notificationLogger;
         this.splitMessageSegmentsWaitTimeMs    = 
splitMessageSegmentsWaitTimeMs;
         this.splitMessageBufferPurgeIntervalMs = 
splitMessageBufferPurgeIntervalMs;
     }
 
-    // ----- MessageDeserializer ---------------------------------------------
+    public TypeReference<T> getMessageType() {
+        return messageType;
+    }
 
+    public TypeReference<AtlasNotificationMessage<T>> 
getNotificationMessageType() {
+        return notificationMessageType;
+    }
+
+    // ----- MessageDeserializer ---------------------------------------------
     @Override
     public T deserialize(String messageJson) {
         final T ret;
@@ -99,15 +108,15 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
         messageCountTotal.incrementAndGet();
         messageCountSinceLastInterval.incrementAndGet();
 
-        AtlasNotificationBaseMessage msg = gson.fromJson(messageJson, 
AtlasNotificationBaseMessage.class);
+        AtlasNotificationBaseMessage msg = AtlasType.fromV1Json(messageJson, 
AtlasNotificationBaseMessage.class);
 
         if (msg.getVersion() == null) { // older style messages not wrapped 
with AtlasNotificationMessage
-            ret = gson.fromJson(messageJson, messageType);
+            ret = AtlasType.fromV1Json(messageJson, messageType);
         } else  {
             String msgJson = messageJson;
 
             if (msg.getMsgSplitCount() > 1) { // multi-part message
-                AtlasNotificationStringMessage splitMsg = 
gson.fromJson(msgJson, AtlasNotificationStringMessage.class);
+                AtlasNotificationStringMessage splitMsg = 
AtlasType.fromV1Json(msgJson, AtlasNotificationStringMessage.class);
 
                 checkVersion(splitMsg, msgJson);
 
@@ -184,7 +193,7 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
                                     LOG.info("Received msgID={}: 
splitCount={}, length={} bytes", msgId, splitCount, bytes.length);
                                 }
 
-                                msg = gson.fromJson(msgJson, 
AtlasNotificationBaseMessage.class);
+                                msg = AtlasType.fromV1Json(msgJson, 
AtlasNotificationBaseMessage.class);
                             } else {
                                 msg = null;
                             }
@@ -197,7 +206,7 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
 
             if (msg != null) {
                 if (CompressionKind.GZIP.equals(msg.getMsgCompressionKind())) {
-                    AtlasNotificationStringMessage compressedMsg = 
gson.fromJson(msgJson, AtlasNotificationStringMessage.class);
+                    AtlasNotificationStringMessage compressedMsg = 
AtlasType.fromV1Json(msgJson, AtlasNotificationStringMessage.class);
 
                     byte[] encodedBytes = 
AtlasNotificationBaseMessage.getBytesUtf8(compressedMsg.getMessage());
                     byte[] bytes        = 
AtlasNotificationBaseMessage.decodeBase64AndGzipUncompress(encodedBytes);
@@ -207,7 +216,7 @@ public abstract class 
AtlasNotificationMessageDeserializer<T> implements Message
                     LOG.info("Received msgID={}: compressed={} bytes, 
uncompressed={} bytes", compressedMsg.getMsgId(), encodedBytes.length, 
bytes.length);
                 }
 
-                AtlasNotificationMessage<T> atlasNotificationMessage = 
gson.fromJson(msgJson, notificationMessageType);
+                AtlasNotificationMessage<T> atlasNotificationMessage = 
AtlasType.fromV1Json(msgJson, notificationMessageType);
 
                 checkVersion(atlasNotificationMessage, msgJson);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
 
b/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
deleted file mode 100644
index 41485a0..0000000
--- 
a/notification/src/main/java/org/apache/atlas/notification/AtlasNotificationStringMessage.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-
-public class AtlasNotificationStringMessage extends 
AtlasNotificationBaseMessage {
-    private String message = null;
-
-    public AtlasNotificationStringMessage() {
-        super(AbstractNotification.CURRENT_MESSAGE_VERSION);
-    }
-
-    public AtlasNotificationStringMessage(String message) {
-        super(AbstractNotification.CURRENT_MESSAGE_VERSION);
-
-        this.message = message;
-    }
-
-    public AtlasNotificationStringMessage(String message, String msgId, 
CompressionKind compressionKind) {
-        super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, 
compressionKind);
-
-        this.message = message;
-    }
-
-    public AtlasNotificationStringMessage(String message, String msgId, 
CompressionKind compressionKind, int msgSplitIdx, int msgSplitCount) {
-        super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, 
compressionKind, msgSplitIdx, msgSplitCount);
-
-        this.message = message;
-    }
-
-    public AtlasNotificationStringMessage(byte[] encodedBytes, String msgId, 
CompressionKind compressionKind) {
-        super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, 
compressionKind);
-
-        this.message = 
AtlasNotificationBaseMessage.getStringUtf8(encodedBytes);
-    }
-
-    public AtlasNotificationStringMessage(byte[] encodedBytes, int offset, int 
length, String msgId, CompressionKind compressionKind, int msgSplitIdx, int 
msgSplitCount) {
-        super(AbstractNotification.CURRENT_MESSAGE_VERSION, msgId, 
compressionKind, msgSplitIdx, msgSplitCount);
-
-        this.message = new String(encodedBytes, offset, length);
-    }
-
-    public void setMessage(String message) {
-        this.message = message;
-    }
-
-    public String getMessage() {
-        return message;
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java 
b/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
deleted file mode 100644
index 7f96638..0000000
--- 
a/notification/src/main/java/org/apache/atlas/notification/MessageVersion.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.atlas.notification;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-
-/**
- * Represents the version of a notification message.
- */
-public class MessageVersion implements Comparable<MessageVersion> {
-    /**
-     * Used for message with no version (old format).
-     */
-    public static final MessageVersion NO_VERSION = new MessageVersion("0");
-    public static final MessageVersion VERSION_1  = new 
MessageVersion("1.0.0");
-
-    public static final MessageVersion CURRENT_VERSION = VERSION_1;
-
-    private final String version;
-
-
-    // ----- Constructors ----------------------------------------------------
-
-    /**
-     * Create a message version.
-     *
-     * @param version  the version string
-     */
-    public MessageVersion(String version) {
-        this.version = version;
-
-        try {
-            getVersionParts();
-        } catch (NumberFormatException e) {
-            throw new IllegalArgumentException(String.format("Invalid version 
string : %s.", version), e);
-        }
-    }
-
-
-    // ----- Comparable ------------------------------------------------------
-
-    @Override
-    public int compareTo(MessageVersion that) {
-        if (that == null) {
-            return 1;
-        }
-
-        Integer[] thisParts = getVersionParts();
-        Integer[] thatParts = that.getVersionParts();
-
-        int length = Math.max(thisParts.length, thatParts.length);
-
-        for (int i = 0; i < length; i++) {
-
-            int comp = getVersionPart(thisParts, i) - 
getVersionPart(thatParts, i);
-
-            if (comp != 0) {
-                return comp;
-            }
-        }
-        return 0;
-    }
-
-
-    // ----- Object overrides ------------------------------------------------
-
-    @Override
-    public boolean equals(Object that) {
-        if (this == that){
-            return true;
-        }
-
-        if (that == null || getClass() != that.getClass()) {
-            return false;
-        }
-
-        return compareTo((MessageVersion) that) == 0;
-    }
-
-    @Override
-    public int hashCode() {
-        return Arrays.hashCode(getVersionParts());
-    }
-
-
-    @Override
-    public String toString() {
-        return "MessageVersion[version=" + version + "]";
-    }
-
-    // ----- helper methods --------------------------------------------------
-
-    /**
-     * Get the version parts array by splitting the version string.
-     * Strip the trailing zeros (i.e. '1.0.0' equals '1').
-     *
-     * @return  the version parts array
-     */
-    protected Integer[] getVersionParts() {
-
-        String[] sParts = version.split("\\.");
-        ArrayList<Integer> iParts = new ArrayList<>();
-        int trailingZeros = 0;
-
-        for (String sPart : sParts) {
-            Integer iPart = new Integer(sPart);
-
-            if (iPart == 0) {
-                ++trailingZeros;
-            } else {
-                for (int i = 0; i < trailingZeros; ++i) {
-                    iParts.add(0);
-                }
-                trailingZeros = 0;
-                iParts.add(iPart);
-            }
-        }
-        return iParts.toArray(new Integer[iParts.size()]);
-    }
-
-    private Integer getVersionPart(Integer[] versionParts, int i) {
-        return i < versionParts.length ? versionParts[i] : 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
 
b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
index 8809225..6caf7e2 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java
@@ -17,13 +17,9 @@
  */
 package org.apache.atlas.notification;
 
-import com.google.gson.reflect.TypeToken;
 import org.apache.atlas.notification.entity.EntityMessageDeserializer;
-import org.apache.atlas.notification.entity.EntityNotification;
 import org.apache.atlas.notification.hook.HookMessageDeserializer;
-import org.apache.atlas.notification.hook.HookNotification;
 
-import java.lang.reflect.Type;
 import java.util.List;
 
 /**
@@ -43,57 +39,22 @@ public interface NotificationInterface {
     String PROPERTY_PREFIX = "atlas.notification";
 
     /**
-     * Notification message class types.
-     */
-    Class<HookNotification.HookNotificationMessage> HOOK_NOTIFICATION_CLASS =
-        HookNotification.HookNotificationMessage.class;
-
-    Class<EntityNotification> ENTITY_NOTIFICATION_CLASS = 
EntityNotification.class;
-
-    /**
-     * Versioned notification message class types.
-     */
-    Type HOOK_VERSIONED_MESSAGE_TYPE =
-        new 
TypeToken<AtlasNotificationMessage<HookNotification.HookNotificationMessage>>(){}.getType();
-
-    Type ENTITY_VERSIONED_MESSAGE_TYPE = new 
TypeToken<AtlasNotificationMessage<EntityNotification>>(){}.getType();
-
-    /**
      * Atlas notification types.
      */
     enum NotificationType {
-
         // Notifications from the Atlas integration hooks.
-        HOOK(HOOK_NOTIFICATION_CLASS, new HookMessageDeserializer()),
+        HOOK(new HookMessageDeserializer()),
 
         // Notifications to entity change consumers.
-        ENTITIES(ENTITY_NOTIFICATION_CLASS, new EntityMessageDeserializer());
-
-
-        /**
-         * The notification class associated with this type.
-         */
-        private final Class classType;
-
-        /**
-         * The message deserializer for this type.
-         */
-        private final MessageDeserializer deserializer;
+        ENTITIES(new EntityMessageDeserializer());
 
+        private final AtlasNotificationMessageDeserializer deserializer;
 
-        NotificationType(Class classType, MessageDeserializer<?> deserializer) 
{
-            this.classType = classType;
+        NotificationType(AtlasNotificationMessageDeserializer deserializer) {
             this.deserializer = deserializer;
         }
 
-
-        // ----- accessors ---------------------------------------------------
-
-        public Class getClassType() {
-            return classType;
-        }
-
-        public MessageDeserializer getDeserializer() {
+        public AtlasNotificationMessageDeserializer getDeserializer() {
             return deserializer;
         }
     }

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
 
b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
index 148b57f..10df121 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/SplitMessageAggregator.java
@@ -18,6 +18,8 @@
 package org.apache.atlas.notification;
 
 
+import org.apache.atlas.model.notification.AtlasNotificationStringMessage;
+
 public class SplitMessageAggregator {
     private final String                           msgId;
     private final AtlasNotificationStringMessage[] splitMessagesBuffer;

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
 
b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
index a6f7e64..fa160cf 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/entity/EntityMessageDeserializer.java
@@ -18,19 +18,14 @@
 
 package org.apache.atlas.notification.entity;
 
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.EntityNotification;
 import org.apache.atlas.notification.AbstractMessageDeserializer;
 import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.notification.NotificationInterface;
+import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.Map;
-
 /**
  * Entity notification message deserializer.
  */
@@ -48,29 +43,19 @@ public class EntityMessageDeserializer extends 
AbstractMessageDeserializer<Entit
      * Create an entity notification message deserializer.
      */
     public EntityMessageDeserializer() {
-        super(NotificationInterface.ENTITY_VERSIONED_MESSAGE_TYPE,
-            AbstractNotification.CURRENT_MESSAGE_VERSION, 
getDeserializerMap(), NOTIFICATION_LOGGER);
-    }
-
-
-    // ----- helper methods --------------------------------------------------
-
-    private static Map<Type, JsonDeserializer> getDeserializerMap() {
-        return Collections.<Type, JsonDeserializer>singletonMap(
-            NotificationInterface.ENTITY_NOTIFICATION_CLASS, new 
EntityNotificationDeserializer());
+        super(new TypeReference<EntityNotification>() {},
+              new 
TypeReference<AtlasNotificationMessage<EntityNotification>>() {},
+              AbstractNotification.CURRENT_MESSAGE_VERSION, 
NOTIFICATION_LOGGER);
     }
 
+    @Override
+    public EntityNotification deserialize(String messageJson) {
+        final EntityNotification ret = super.deserialize(messageJson);
 
-    // ----- deserializer classes --------------------------------------------
-
-    /**
-     * Deserializer for EntityNotification.
-     */
-    protected static final class EntityNotificationDeserializer implements 
JsonDeserializer<EntityNotification> {
-        @Override
-        public EntityNotification deserialize(final JsonElement json, final 
Type type,
-                                              final JsonDeserializationContext 
context) {
-            return context.deserialize(json, EntityNotificationImpl.class);
+        if (ret != null) {
+            ret.normalize();
         }
+
+        return ret;
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java
 
b/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java
deleted file mode 100644
index 379e815..0000000
--- 
a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotification.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification.entity;
-
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-
-import java.util.List;
-
-/**
- * Notification of entity changes.
- */
-public interface EntityNotification {
-
-    /**
-     * Operations that result in an entity notification.
-     */
-    enum OperationType {
-        ENTITY_CREATE,
-        ENTITY_UPDATE,
-        ENTITY_DELETE,
-        TRAIT_ADD,
-        TRAIT_DELETE,
-        TRAIT_UPDATE
-    }
-
-
-    // ----- EntityNotification 
------------------------------------------------
-
-    /**
-     * Get the entity that is associated with this notification.
-     *
-     * @return the associated entity
-     */
-    IReferenceableInstance getEntity();
-
-    /**
-     * Get flattened list of traits that are associated with this entity 
(includes super traits).
-     *
-     * @return the list of all traits
-     */
-    List<IStruct> getAllTraits();
-
-    /**
-     * Get the type of operation that triggered this notification.
-     *
-     * @return the operation type
-     */
-    OperationType getOperationType();
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java
 
b/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java
deleted file mode 100644
index 6a9b362..0000000
--- 
a/notification/src/main/java/org/apache/atlas/notification/entity/EntityNotificationImpl.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification.entity;
-
-import org.apache.atlas.AtlasException;
-import org.apache.atlas.typesystem.IReferenceableInstance;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.Struct;
-import org.apache.atlas.typesystem.types.FieldMapping;
-import org.apache.atlas.typesystem.types.TraitType;
-import org.apache.atlas.typesystem.types.TypeSystem;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * Entity notification implementation.
- */
-public class EntityNotificationImpl implements EntityNotification {
-
-    private final Referenceable entity;
-    private final OperationType operationType;
-    private final List<IStruct> traits;
-
-
-    // ----- Constructors 
------------------------------------------------------
-
-    /**
-     * No-arg constructor for serialization.
-     */
-    @SuppressWarnings("unused")
-    private EntityNotificationImpl() throws AtlasException {
-        this(null, OperationType.ENTITY_CREATE, 
Collections.<IStruct>emptyList());
-    }
-
-    /**
-     * Construct an EntityNotification.
-     *
-     * @param entity            the entity subject of the notification
-     * @param operationType     the type of operation that caused the 
notification
-     * @param traits            the traits for the given entity
-     *
-     * @throws AtlasException if the entity notification can not be created
-     */
-    public EntityNotificationImpl(Referenceable entity, OperationType 
operationType, List<IStruct> traits)
-        throws AtlasException {
-        this.entity = entity;
-        this.operationType = operationType;
-        this.traits = traits;
-    }
-
-    /**
-     * Construct an EntityNotification.
-     *
-     * @param entity         the entity subject of the notification
-     * @param operationType  the type of operation that caused the notification
-     * @param typeSystem     the Atlas type system
-     *
-     * @throws AtlasException if the entity notification can not be created
-     */
-    public EntityNotificationImpl(Referenceable entity, OperationType 
operationType, TypeSystem typeSystem)
-        throws AtlasException {
-        this(entity, operationType, getAllTraits(entity, typeSystem));
-    }
-
-
-    // ----- EntityNotification 
------------------------------------------------
-
-    @Override
-    public IReferenceableInstance getEntity() {
-        return entity;
-    }
-
-    @Override
-    public List<IStruct> getAllTraits() {
-        return traits;
-    }
-
-    @Override
-    public OperationType getOperationType() {
-        return operationType;
-    }
-
-
-    // ----- Object overrides 
--------------------------------------------------
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        EntityNotificationImpl that = (EntityNotificationImpl) o;
-        return Objects.equals(entity, that.entity) &&
-                operationType == that.operationType &&
-                Objects.equals(traits, that.traits);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(entity, operationType, traits);
-    }
-
-
-    // ----- helper methods 
----------------------------------------------------
-
-    private static List<IStruct> getAllTraits(IReferenceableInstance 
entityDefinition,
-                                              TypeSystem typeSystem) throws 
AtlasException {
-        List<IStruct> traitInfo = new LinkedList<>();
-        for (String traitName : entityDefinition.getTraits()) {
-            IStruct trait = entityDefinition.getTrait(traitName);
-            String typeName = trait.getTypeName();
-            Map<String, Object> valuesMap = trait.getValuesMap();
-            traitInfo.add(new Struct(typeName, valuesMap));
-            traitInfo.addAll(getSuperTraits(typeName, valuesMap, typeSystem));
-        }
-        return traitInfo;
-    }
-
-    private static List<IStruct> getSuperTraits(
-            String typeName, Map<String, Object> values, TypeSystem 
typeSystem) throws AtlasException {
-
-        List<IStruct> superTypes = new LinkedList<>();
-
-        TraitType traitDef = typeSystem.getDataType(TraitType.class, typeName);
-        Set<String> superTypeNames = traitDef.getAllSuperTypeNames();
-
-        for (String superTypeName : superTypeNames) {
-            TraitType superTraitDef = typeSystem.getDataType(TraitType.class, 
superTypeName);
-
-            Map<String, Object> superTypeValues = new HashMap<>();
-
-            FieldMapping fieldMapping = superTraitDef.fieldMapping();
-
-            if (fieldMapping != null) {
-                Set<String> superTypeAttributeNames = 
fieldMapping.fields.keySet();
-
-                for (String superTypeAttributeName : superTypeAttributeNames) {
-                    if (values.containsKey(superTypeAttributeName)) {
-                        superTypeValues.put(superTypeAttributeName, 
values.get(superTypeAttributeName));
-                    }
-                }
-            }
-            IStruct superTrait = new Struct(superTypeName, superTypeValues);
-            superTypes.add(superTrait);
-            superTypes.addAll(getSuperTraits(superTypeName, values, 
typeSystem));
-        }
-
-        return superTypes;
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
 
b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
index 8337de0..cab442f 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/hook/HookMessageDeserializer.java
@@ -18,21 +18,20 @@
 
 package org.apache.atlas.notification.hook;
 
-import com.google.gson.JsonDeserializer;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.notification.AbstractMessageDeserializer;
 import org.apache.atlas.notification.AbstractNotification;
-import org.apache.atlas.notification.NotificationInterface;
+import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Type;
-import java.util.Collections;
-import java.util.Map;
+
 
 /**
  * Hook notification message deserializer.
  */
-public class HookMessageDeserializer extends 
AbstractMessageDeserializer<HookNotification.HookNotificationMessage> {
+public class HookMessageDeserializer extends 
AbstractMessageDeserializer<HookNotification> {
 
     /**
      * Logger for hook notification messages.
@@ -46,15 +45,19 @@ public class HookMessageDeserializer extends 
AbstractMessageDeserializer<HookNot
      * Create a hook notification message deserializer.
      */
     public HookMessageDeserializer() {
-        super(NotificationInterface.HOOK_VERSIONED_MESSAGE_TYPE,
-            AbstractNotification.CURRENT_MESSAGE_VERSION, 
getDeserializerMap(), NOTIFICATION_LOGGER);
+        super(new TypeReference<HookNotification>() {},
+              new TypeReference<AtlasNotificationMessage<HookNotification>>() 
{},
+              AbstractNotification.CURRENT_MESSAGE_VERSION, 
NOTIFICATION_LOGGER);
     }
 
+    @Override
+    public HookNotification deserialize(String messageJson) {
+        final HookNotification ret = super.deserialize(messageJson);
 
-    // ----- helper methods --------------------------------------------------
+        if (ret != null) {
+            ret.normalize();
+        }
 
-    private static Map<Type, JsonDeserializer> getDeserializerMap() {
-        return Collections.<Type, JsonDeserializer>singletonMap(
-            NotificationInterface.HOOK_NOTIFICATION_CLASS, new 
HookNotification());
+        return ret;
     }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
 
b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
deleted file mode 100644
index a25aa52..0000000
--- 
a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.notification.hook;
-
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.TypesDef;
-import org.apache.atlas.typesystem.json.InstanceSerialization;
-import org.apache.commons.lang.StringUtils;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Contains the structure of messages transferred from hooks to atlas.
- */
-public class HookNotification implements 
JsonDeserializer<HookNotification.HookNotificationMessage> {
-
-    @Override
-    public HookNotificationMessage deserialize(JsonElement json, Type typeOfT,
-                                               JsonDeserializationContext 
context) {
-        HookNotificationType type =
-                context.deserialize(((JsonObject) json).get("type"), 
HookNotificationType.class);
-        switch (type) {
-        case ENTITY_CREATE:
-            return context.deserialize(json, EntityCreateRequest.class);
-
-        case ENTITY_FULL_UPDATE:
-            return context.deserialize(json, EntityUpdateRequest.class);
-
-        case ENTITY_PARTIAL_UPDATE:
-            return context.deserialize(json, EntityPartialUpdateRequest.class);
-
-        case ENTITY_DELETE:
-            return context.deserialize(json, EntityDeleteRequest.class);
-
-        case TYPE_CREATE:
-        case TYPE_UPDATE:
-            return context.deserialize(json, TypeRequest.class);
-
-        default:
-            throw new IllegalStateException("Unhandled type " + type);
-        }
-    }
-
-    /**
-     * Type of the hook message.
-     */
-    public enum HookNotificationType {
-        TYPE_CREATE, TYPE_UPDATE, ENTITY_CREATE, ENTITY_PARTIAL_UPDATE, 
ENTITY_FULL_UPDATE, ENTITY_DELETE
-    }
-
-    /**
-     * Base type of hook message.
-     */
-    public static class HookNotificationMessage {
-        public static final String UNKNOW_USER = "UNKNOWN";
-        protected HookNotificationType type;
-        protected String user;
-
-        private HookNotificationMessage() {
-        }
-
-        public HookNotificationMessage(HookNotificationType type, String user) 
{
-            this.type = type;
-            this.user = user;
-        }
-
-        public HookNotificationType getType() {
-            return type;
-        }
-
-        public String getUser() {
-            if (StringUtils.isEmpty(user)) {
-                return UNKNOW_USER;
-            }
-            return user;
-        }
-
-
-    }
-
-    /**
-     * Hook message for create type definitions.
-     */
-    public static class TypeRequest extends HookNotificationMessage {
-        private TypesDef typesDef;
-
-        private TypeRequest() {
-        }
-
-        public TypeRequest(HookNotificationType type, TypesDef typesDef, 
String user) {
-            super(type, user);
-            this.typesDef = typesDef;
-        }
-
-        public TypesDef getTypesDef() {
-            return typesDef;
-        }
-    }
-
-    /**
-     * Hook message for creating new entities.
-     */
-    public static class EntityCreateRequest extends HookNotificationMessage {
-        private List<Referenceable> entities;
-
-        private EntityCreateRequest() {
-        }
-
-        public EntityCreateRequest(String user, Referenceable... entities) {
-            this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities), 
user);
-        }
-
-        public EntityCreateRequest(String user, List<Referenceable> entities) {
-            this(HookNotificationType.ENTITY_CREATE, entities, user);
-        }
-
-        protected EntityCreateRequest(HookNotificationType type, 
List<Referenceable> entities, String user) {
-            super(type, user);
-            this.entities = entities;
-        }
-
-        public EntityCreateRequest(String user, JSONArray jsonArray) {
-            super(HookNotificationType.ENTITY_CREATE, user);
-            entities = new ArrayList<>();
-            for (int index = 0; index < jsonArray.length(); index++) {
-                try {
-                    
entities.add(InstanceSerialization.fromJsonReferenceable(jsonArray.getString(index),
 true));
-                } catch (JSONException e) {
-                    throw new JsonParseException(e);
-                }
-            }
-        }
-
-        public List<Referenceable> getEntities() {
-            return entities;
-        }
-
-        @Override
-        public String toString() {
-            return entities.toString();
-        }
-    }
-
-    /**
-     * Hook message for updating entities(full update).
-     */
-    public static class EntityUpdateRequest extends EntityCreateRequest {
-        public EntityUpdateRequest(String user, Referenceable... entities) {
-            this(user, Arrays.asList(entities));
-        }
-
-        public EntityUpdateRequest(String user, List<Referenceable> entities) {
-            super(HookNotificationType.ENTITY_FULL_UPDATE, entities, user);
-        }
-    }
-
-    /**
-     * Hook message for updating entities(partial update).
-     */
-    public static class EntityPartialUpdateRequest extends 
HookNotificationMessage {
-        private String typeName;
-        private String attribute;
-        private Referenceable entity;
-        private String attributeValue;
-
-        private EntityPartialUpdateRequest() {
-        }
-
-        public EntityPartialUpdateRequest(String user, String typeName, String 
attribute, String attributeValue,
-                                          Referenceable entity) {
-            super(HookNotificationType.ENTITY_PARTIAL_UPDATE, user);
-            this.typeName = typeName;
-            this.attribute = attribute;
-            this.attributeValue = attributeValue;
-            this.entity = entity;
-        }
-
-        public String getTypeName() {
-            return typeName;
-        }
-
-        public String getAttribute() {
-            return attribute;
-        }
-
-        public Referenceable getEntity() {
-            return entity;
-        }
-
-        public String getAttributeValue() {
-            return attributeValue;
-        }
-
-        @Override
-        public String toString() {
-            return  "{"
-                + "entityType='" + typeName + '\''
-                + ", attribute=" + attribute
-                + ", value=" + attributeValue
-                + ", entity=" + entity
-                + '}';
-        }
-    }
-
-    /**
-     * Hook message for creating new entities.
-     */
-    public static class EntityDeleteRequest extends HookNotificationMessage {
-
-        private String typeName;
-        private String attribute;
-        private String attributeValue;
-
-        private EntityDeleteRequest() {
-        }
-
-        public EntityDeleteRequest(String user, String typeName, String 
attribute, String attributeValue) {
-            this(HookNotificationType.ENTITY_DELETE, user, typeName, 
attribute, attributeValue);
-        }
-
-        protected EntityDeleteRequest(HookNotificationType type,
-            String user, String typeName, String attribute, String 
attributeValue) {
-            super(type, user);
-            this.typeName = typeName;
-            this.attribute = attribute;
-            this.attributeValue = attributeValue;
-        }
-
-        public String getTypeName() {
-            return typeName;
-        }
-
-        public String getAttribute() {
-            return attribute;
-        }
-
-        public String getAttributeValue() {
-            return attributeValue;
-        }
-
-        @Override
-        public String toString() {
-            return  "{"
-                + "entityType='" + typeName + '\''
-                + ", attribute=" + attribute
-                + ", value=" + attributeValue
-                + '}';
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java 
b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
index d59cb1c..0a0620f 100644
--- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
+++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
@@ -18,9 +18,10 @@
 
 package org.apache.atlas.hook;
 
+import org.apache.atlas.model.notification.HookNotification;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.hook.HookNotification;
+import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.BeforeMethod;
@@ -51,41 +52,41 @@ public class AtlasHookTest {
 
     @Test (timeOut = 10000)
     public void testNotifyEntitiesDoesNotHangOnException() throws Exception {
-        List<HookNotification.HookNotificationMessage> 
hookNotificationMessages = new ArrayList<>();
+        List<HookNotification> hookNotifications = new ArrayList<>();
         doThrow(new NotificationException(new 
Exception())).when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 0, 
notificationInterface, false,
+                .send(NotificationInterface.NotificationType.HOOK, 
hookNotifications);
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 0, 
notificationInterface, false,
                 failedMessagesLogger);
         // if we've reached here, the method finished OK.
     }
 
     @Test
     public void testNotifyEntitiesRetriesOnException() throws 
NotificationException {
-        List<HookNotification.HookNotificationMessage> 
hookNotificationMessages =
-                new ArrayList<HookNotification.HookNotificationMessage>() {{
-                    add(new HookNotification.EntityCreateRequest("user"));
+        List<HookNotification> hookNotifications =
+                new ArrayList<HookNotification>() {{
+                    add(new EntityCreateRequest("user"));
                 }
             };
         doThrow(new NotificationException(new 
Exception())).when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, false,
+                .send(NotificationInterface.NotificationType.HOOK, 
hookNotifications);
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, 
notificationInterface, false,
                 failedMessagesLogger);
 
         verify(notificationInterface, times(2)).
-                send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
+                send(NotificationInterface.NotificationType.HOOK, 
hookNotifications);
     }
 
     @Test
     public void testFailedMessageIsLoggedIfRequired() throws 
NotificationException {
-        List<HookNotification.HookNotificationMessage> 
hookNotificationMessages =
-                new ArrayList<HookNotification.HookNotificationMessage>() {{
-                    add(new HookNotification.EntityCreateRequest("user"));
+        List<HookNotification> hookNotifications =
+                new ArrayList<HookNotification>() {{
+                    add(new EntityCreateRequest("user"));
                 }
             };
         doThrow(new NotificationException(new Exception(), Arrays.asList("test 
message")))
                 .when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, true,
+                .send(NotificationInterface.NotificationType.HOOK, 
hookNotifications);
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, 
notificationInterface, true,
                 failedMessagesLogger);
 
         verify(failedMessagesLogger, times(1)).log("test message");
@@ -93,11 +94,11 @@ public class AtlasHookTest {
 
     @Test
     public void testFailedMessageIsNotLoggedIfNotRequired() throws 
NotificationException {
-        List<HookNotification.HookNotificationMessage> 
hookNotificationMessages = new ArrayList<>();
+        List<HookNotification> hookNotifications = new ArrayList<>();
         doThrow(new NotificationException(new Exception(), Arrays.asList("test 
message")))
                 .when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, false,
+                .send(NotificationInterface.NotificationType.HOOK, 
hookNotifications);
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, 
notificationInterface, false,
                 failedMessagesLogger);
 
         verifyZeroInteractions(failedMessagesLogger);
@@ -105,15 +106,15 @@ public class AtlasHookTest {
 
     @Test
     public void testAllFailedMessagesAreLogged() throws NotificationException {
-        List<HookNotification.HookNotificationMessage> 
hookNotificationMessages =
-                new ArrayList<HookNotification.HookNotificationMessage>() {{
-                    add(new HookNotification.EntityCreateRequest("user"));
+        List<HookNotification> hookNotifications =
+                new ArrayList<HookNotification>() {{
+                    add(new EntityCreateRequest("user"));
                 }
             };
         doThrow(new NotificationException(new Exception(), Arrays.asList("test 
message1", "test message2")))
                 .when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, true,
+                .send(NotificationInterface.NotificationType.HOOK, 
hookNotifications);
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, 
notificationInterface, true,
                 failedMessagesLogger);
 
         verify(failedMessagesLogger, times(1)).log("test message1");
@@ -122,10 +123,10 @@ public class AtlasHookTest {
 
     @Test
     public void testFailedMessageIsNotLoggedIfNotANotificationException() 
throws Exception {
-        List<HookNotification.HookNotificationMessage> 
hookNotificationMessages = new ArrayList<>();
+        List<HookNotification> hookNotifications = new ArrayList<>();
         doThrow(new RuntimeException("test 
message")).when(notificationInterface)
-                .send(NotificationInterface.NotificationType.HOOK, 
hookNotificationMessages);
-        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, 
notificationInterface, true,
+                .send(NotificationInterface.NotificationType.HOOK, 
hookNotifications);
+        AtlasHook.notifyEntitiesInternal(hookNotifications, 2, 
notificationInterface, true,
                 failedMessagesLogger);
 
         verifyZeroInteractions(failedMessagesLogger);

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 08a20bd..2e8abd7 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -18,34 +18,30 @@
 
 package org.apache.atlas.kafka;
 
-import kafka.message.MessageAndMetadata;
-import org.apache.atlas.notification.*;
-import org.apache.atlas.notification.AtlasNotificationMessage;
-import org.apache.atlas.notification.entity.EntityNotificationImplTest;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.typesystem.IStruct;
-import org.apache.atlas.typesystem.Referenceable;
-import org.apache.atlas.typesystem.Struct;
+import org.apache.atlas.model.notification.HookNotification;
+import org.apache.atlas.v1.model.instance.Referenceable;
+import org.apache.atlas.v1.model.instance.Struct;
+import org.apache.atlas.notification.IncompatibleVersionException;
+import org.apache.atlas.notification.NotificationInterface.NotificationType;
+import org.apache.atlas.model.notification.AtlasNotificationMessage;
+import org.apache.atlas.notification.entity.EntityNotificationTest;
+import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest;
+import org.apache.atlas.type.AtlasType;
+import org.apache.atlas.model.notification.MessageVersion;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.codehaus.jettison.json.JSONException;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Map;
 
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -55,7 +51,6 @@ import static org.testng.Assert.*;
  * KafkaConsumer tests.
  */
 public class KafkaConsumerTest {
-
     private static final String TRAIT_NAME = "MyTrait";
 
 
@@ -70,88 +65,62 @@ public class KafkaConsumerTest {
 
     @Test
     public void testReceive() throws Exception {
-
-
-        MessageAndMetadata<String, String> messageAndMetadata = 
mock(MessageAndMetadata.class);
-
-        Referenceable entity = getEntity(TRAIT_NAME);
-
-        HookNotification.EntityUpdateRequest message =
-            new HookNotification.EntityUpdateRequest("user1", entity);
-
-        String json = AbstractNotification.GSON.toJson(new 
AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message));
-
-        kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 
0)));
-        List<ConsumerRecord> klist = new ArrayList<>();
-        klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK",
-                0, 0L, "mykey", json));
-
-        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
-        Map mp = new HashMap();
-        mp.put(tp,klist);
-        ConsumerRecords records = new ConsumerRecords(mp);
-
+        Referenceable                        entity  = getEntity(TRAIT_NAME);
+        EntityUpdateRequest                  message = new 
EntityUpdateRequest("user1", entity);
+        String                               json    = AtlasType.toV1Json(new 
AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message));
+        TopicPartition                       tp      = new 
TopicPartition("ATLAS_HOOK", 0);
+        List<ConsumerRecord<String, String>> klist   = 
Collections.singletonList(new ConsumerRecord<>("ATLAS_HOOK", 0, 0L, "mykey", 
json));
+        Map                                  mp      = 
Collections.singletonMap(tp, klist);
+        ConsumerRecords                      records = new ConsumerRecords(mp);
 
         when(kafkaConsumer.poll(100)).thenReturn(records);
-        when(messageAndMetadata.message()).thenReturn(json);
 
+        kafkaConsumer.assign(Collections.singletonList(tp));
+
+        AtlasKafkaConsumer                        consumer    = new 
AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L);
+        List<AtlasKafkaMessage<HookNotification>> messageList = 
consumer.receive();
 
-        AtlasKafkaConsumer consumer = new 
AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(),
 kafkaConsumer, false, 100L);
-        List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> 
messageList = consumer.receive();
         assertTrue(messageList.size() > 0);
 
-        HookNotification.HookNotificationMessage consumedMessage  = 
messageList.get(0).getMessage();
+        HookNotification consumedMessage  = messageList.get(0).getMessage();
 
         assertMessagesEqual(message, consumedMessage, entity);
-
     }
 
     @Test
     public void testNextVersionMismatch() throws Exception {
+        Referenceable                        entity  = getEntity(TRAIT_NAME);
+        EntityUpdateRequest                  message = new 
EntityUpdateRequest("user1", entity);
+        String                               json    = AtlasType.toV1Json(new 
AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message));
+        TopicPartition                       tp      = new 
TopicPartition("ATLAS_HOOK",0);
+        List<ConsumerRecord<String, String>> klist   = 
Collections.singletonList(new ConsumerRecord<>("ATLAS_HOOK", 0, 0L, "mykey", 
json));
+        Map                                  mp      = 
Collections.singletonMap(tp,klist);
+        ConsumerRecords                      records = new ConsumerRecords(mp);
 
-        MessageAndMetadata<String, String> messageAndMetadata = 
mock(MessageAndMetadata.class);
-
-        Referenceable entity = getEntity(TRAIT_NAME);
-
-        HookNotification.EntityUpdateRequest message =
-            new HookNotification.EntityUpdateRequest("user1", entity);
-
-        String json = AbstractNotification.GSON.toJson(new 
AtlasNotificationMessage<>(new MessageVersion("2.0.0"), message));
-
-        kafkaConsumer.assign(Arrays.asList(new TopicPartition("ATLAS_HOOK", 
0)));
-        List<ConsumerRecord> klist = new ArrayList<>();
-        klist.add(new ConsumerRecord<String, String>("ATLAS_HOOK",
-                0, 0L, "mykey", json));
-
-        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
-        Map mp = new HashMap();
-        mp.put(tp,klist);
-        ConsumerRecords records = new ConsumerRecords(mp);
+        kafkaConsumer.assign(Collections.singletonList(tp));
 
         when(kafkaConsumer.poll(100L)).thenReturn(records);
-        when(messageAndMetadata.message()).thenReturn(json);
 
-        AtlasKafkaConsumer consumer =new 
AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(),
 kafkaConsumer ,false, 100L);
+        AtlasKafkaConsumer consumer =new 
AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer ,false, 100L);
+
         try {
-            List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> 
messageList = consumer.receive();
+            List<AtlasKafkaMessage<HookNotification>> messageList = 
consumer.receive();
+
             assertTrue(messageList.size() > 0);
 
-            HookNotification.HookNotificationMessage consumedMessage  = 
messageList.get(0).getMessage();
+            HookNotification consumedMessage  = 
messageList.get(0).getMessage();
 
             fail("Expected VersionMismatchException!");
         } catch (IncompatibleVersionException e) {
             e.printStackTrace();
         }
-
   }
 
 
     @Test
     public void testCommitIsCalledIfAutoCommitDisabled() {
-
-        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
-
-        AtlasKafkaConsumer consumer =new 
AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(),
 kafkaConsumer, false, 100L);
+        TopicPartition     tp       = new TopicPartition("ATLAS_HOOK",0);
+        AtlasKafkaConsumer consumer = new 
AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L);
 
         consumer.commit(tp, 1);
 
@@ -160,10 +129,8 @@ public class KafkaConsumerTest {
 
     @Test
     public void testCommitIsNotCalledIfAutoCommitEnabled() {
-
-        TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
-
-        AtlasKafkaConsumer consumer =new 
AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(),
 kafkaConsumer, true , 100L);
+        TopicPartition     tp       = new TopicPartition("ATLAS_HOOK",0);
+        AtlasKafkaConsumer consumer = new 
AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, true , 100L);
 
         consumer.commit(tp, 1);
 
@@ -171,26 +138,21 @@ public class KafkaConsumerTest {
     }
 
     private Referenceable getEntity(String traitName) {
-        Referenceable entity = EntityNotificationImplTest.getEntity("id");
-        List<IStruct> traitInfo = new LinkedList<>();
-        IStruct trait = new Struct(traitName, Collections.<String, 
Object>emptyMap());
-        traitInfo.add(trait);
-        return entity;
+        return EntityNotificationTest.getEntity("id", new Struct(traitName, 
Collections.<String, Object>emptyMap()));
     }
 
-    private void assertMessagesEqual(HookNotification.EntityUpdateRequest 
message,
-                                     HookNotification.HookNotificationMessage 
consumedMessage,
-                                     Referenceable entity) throws 
JSONException {
-
+    private void assertMessagesEqual(EntityUpdateRequest message,
+                                     HookNotification    consumedMessage,
+                                     Referenceable       entity) {
         assertEquals(consumedMessage.getType(), message.getType());
         assertEquals(consumedMessage.getUser(), message.getUser());
 
-        assertTrue(consumedMessage instanceof 
HookNotification.EntityUpdateRequest);
+        assertTrue(consumedMessage instanceof EntityUpdateRequest);
 
-        HookNotification.EntityUpdateRequest deserializedEntityUpdateRequest =
-            (HookNotification.EntityUpdateRequest) consumedMessage;
+        EntityUpdateRequest deserializedEntityUpdateRequest = 
(EntityUpdateRequest) consumedMessage;
 
         Referenceable deserializedEntity = 
deserializedEntityUpdateRequest.getEntities().get(0);
+
         assertEquals(deserializedEntity.getId(), entity.getId());
         assertEquals(deserializedEntity.getTypeName(), entity.getTypeName());
         assertEquals(deserializedEntity.getTraits(), entity.getTraits());

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
index 09e2e43..78d2a90 100644
--- 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
+++ 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.testng.annotations.Test;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -32,7 +33,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import scala.actors.threadpool.Arrays;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;

http://git-wip-us.apache.org/repos/asf/atlas/blob/435fe3fb/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index a1e13b9..e0655f3 100644
--- 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -19,29 +19,28 @@
 package org.apache.atlas.kafka;
 
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.hook.HookNotification;
-import org.apache.atlas.typesystem.Referenceable;
+import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang.RandomStringUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
-import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import org.apache.atlas.model.notification.HookNotification;
 
 import java.util.List;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
 
 public class KafkaNotificationTest {
-
     private KafkaNotification kafkaNotification;
 
     @BeforeClass
     public void setup() throws Exception {
         Configuration properties = ApplicationProperties.get();
+
         properties.setProperty("atlas.kafka.data", "target/" + 
RandomStringUtils.randomAlphanumeric(5));
 
         kafkaNotification = new KafkaNotification(properties);
@@ -56,29 +55,27 @@ public class KafkaNotificationTest {
 
     @Test
     public void testReceiveKafkaMessages() throws Exception {
-        kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
-                new HookNotification.EntityCreateRequest("u1", new 
Referenceable("type")));
-        kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
-                new HookNotification.EntityCreateRequest("u2", new 
Referenceable("type")));
-        kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
-                new HookNotification.EntityCreateRequest("u3", new 
Referenceable("type")));
-        kafkaNotification.send(NotificationInterface.NotificationType.HOOK,
-                new HookNotification.EntityCreateRequest("u4", new 
Referenceable("type")));
-
-        NotificationConsumer<Object> consumer =
-                
kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 
1).get(0);
-        List<AtlasKafkaMessage<Object>> messages = null ;
-        long startTime = System.currentTimeMillis(); //fetch starting time
+        kafkaNotification.send(NotificationInterface.NotificationType.HOOK, 
new EntityCreateRequest("u1", new Referenceable("type")));
+        kafkaNotification.send(NotificationInterface.NotificationType.HOOK, 
new EntityCreateRequest("u2", new Referenceable("type")));
+        kafkaNotification.send(NotificationInterface.NotificationType.HOOK, 
new EntityCreateRequest("u3", new Referenceable("type")));
+        kafkaNotification.send(NotificationInterface.NotificationType.HOOK, 
new EntityCreateRequest("u4", new Referenceable("type")));
+
+        NotificationConsumer<Object>    consumer  = 
kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 
1).get(0);
+        List<AtlasKafkaMessage<Object>> messages  = null ;
+        long                            startTime = 
System.currentTimeMillis(); //fetch starting time
+
         while ((System.currentTimeMillis() - startTime) < 10000) {
              messages = consumer.receive();
+
             if (messages.size() > 0) {
                 break;
             }
         }
 
-        int i=1;
+        int i = 1;
         for (AtlasKafkaMessage<Object> msg :  messages){
-            HookNotification.HookNotificationMessage message =  
(HookNotificationMessage) msg.getMessage();
+            HookNotification message =  (HookNotification) msg.getMessage();
+
             assertEquals(message.getUser(), "u"+i++);
         }
 

Reply via email to