This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a5009e  ATLAS-3071: updated stats/metrics module to collect 
notification metrics
1a5009e is described below

commit 1a5009ea7ec589e5809aae5ead8fac2467394f25
Author: Madhan Neethiraj <mad...@apache.org>
AuthorDate: Thu Mar 21 16:35:12 2019 -0700

    ATLAS-3071: updated stats/metrics module to collect notification metrics
    
    Co-authored-by: "lma <l...@cloudera.com>"
---
 .../org/apache/atlas/model/AtlasStatistics.java    |  79 ------
 .../apache/atlas/model/metrics/AtlasMetrics.java   |  57 ++++-
 .../org/apache/atlas/services/MetricsService.java  | 116 ++++-----
 .../org/apache/atlas/util/AtlasMetricsCounter.java | 268 ++++++++++++++++++++
 .../org/apache/atlas/util/AtlasMetricsUtil.java    | 271 ++++++++++++++++++++
 .../java/org/apache/atlas/util/StatisticsUtil.java | 274 ---------------------
 .../apache/atlas/services/MetricsServiceTest.java  | 115 ++++++++-
 .../notification/NotificationHookConsumer.java     | 107 ++++----
 .../web/service/ActiveInstanceElectorService.java  |  41 ++-
 .../NotificationHookConsumerKafkaTest.java         |  10 +-
 .../notification/NotificationHookConsumerTest.java |  32 +--
 .../service/ActiveInstanceElectorServiceTest.java  |  32 +--
 12 files changed, 866 insertions(+), 536 deletions(-)

diff --git a/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java 
b/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java
deleted file mode 100644
index 0ecbd9a..0000000
--- a/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.model;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.databind.annotation.JsonSerialize;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-
-import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE;
-import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY;
-
-/**
- * Atlas statistics
- */
-@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = 
PUBLIC_ONLY, fieldVisibility = NONE)
-@JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS)
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class AtlasStatistics {
-    public static final String STAT_SERVER_START_TS                = 
"Server:upFrom";
-    public static final String STAT_SERVER_ACTIVE_TS               = 
"Server:activateFrom";
-    public static final String STAT_SERVER_UP_SINCE                = 
"Server:upTime";
-    public static final String STAT_START_OFFSET                   = 
"Notification:ATLAS_HOOK:offsetStart";
-    public static final String STAT_CURRENT_OFFSET                 = 
"Notification:ATLAS_HOOK:offsetCurrent";
-    public static final String STAT_SOLR_STATUS                    = 
"ConnectionStatus:Solr";
-    public static final String STAT_HBASE_STATUS                   = 
"ConnectionStatus:HBase";
-    public static final String STAT_LAST_MESSAGE_PROCESSED_TIME_TS = 
"Notification:ATLAS_HOOK:messageLastProcessedAt";
-    public static final String STAT_AVG_MESSAGE_PROCESSING_TIME    = 
"Notification:ATLAS_HOOK:messageAvgProcessingDuration";
-    public static final String STAT_MESSAGES_CONSUMED = 
"Notification:ATLAS_HOOK:messagesConsumed";
-
-    private Map<String, Object> data = new HashMap<>();
-
-    public Map<String, Object> getData() {
-        return data;
-    }
-
-    public void setData(Map<String, Object> data) {
-        this.data = data;
-    }
-
-    @Override
-    public String toString() {
-        return "AtlasStatistics{" +
-                "data=" + data +
-                '}';
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(data);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        AtlasStatistics other = (AtlasStatistics) o;
-
-        return Objects.equals(this.data, other.data);
-    }
-}
diff --git 
a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java 
b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
index c3304cc..6f7c914 100644
--- a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
+++ b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java
@@ -36,6 +36,51 @@ import static 
com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_
 @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
 @JsonIgnoreProperties(ignoreUnknown=true)
 public class AtlasMetrics {
+    public static final String PREFIX_CONNECTION_STATUS = "ConnectionStatus:";
+    public static final String PREFIX_NOTIFICATION      = "Notification:";
+    public static final String PREFIX_SERVER            = "Server:";
+
+    public static final String STAT_CONNECTION_STATUS_BACKEND_STORE    = 
PREFIX_CONNECTION_STATUS + "backendStore";
+    public static final String STAT_CONNECTION_STATUS_INDEX_STORE      = 
PREFIX_CONNECTION_STATUS + "indexStore";
+    public static final String STAT_NOTIFY_COUNT_CURR_DAY              = 
PREFIX_NOTIFICATION + "currentDay";
+    public static final String STAT_NOTIFY_AVG_TIME_CURR_DAY           = 
PREFIX_NOTIFICATION + "currentDayAvgTime";
+    public static final String STAT_NOTIFY_CREATES_COUNT_CURR_DAY      = 
PREFIX_NOTIFICATION + "currentDayEntityCreates";
+    public static final String STAT_NOTIFY_UPDATES_COUNT_CURR_DAY      = 
PREFIX_NOTIFICATION + "currentDayEntityUpdates";
+    public static final String STAT_NOTIFY_DELETES_COUNT_CURR_DAY      = 
PREFIX_NOTIFICATION + "currentDayEntityDeletes";
+    public static final String STAT_NOTIFY_FAILED_COUNT_CURR_DAY       = 
PREFIX_NOTIFICATION + "currentDayFailed";
+    public static final String STAT_NOTIFY_START_TIME_CURR_DAY         = 
PREFIX_NOTIFICATION + "currentDayStartTime";
+    public static final String STAT_NOTIFY_COUNT_CURR_HOUR             = 
PREFIX_NOTIFICATION + "currentHour";
+    public static final String STAT_NOTIFY_AVG_TIME_CURR_HOUR          = 
PREFIX_NOTIFICATION + "currentHourAvgTime";
+    public static final String STAT_NOTIFY_CREATES_COUNT_CURR_HOUR     = 
PREFIX_NOTIFICATION + "currentHourEntityCreates";
+    public static final String STAT_NOTIFY_UPDATES_COUNT_CURR_HOUR     = 
PREFIX_NOTIFICATION + "currentHourEntityUpdates";
+    public static final String STAT_NOTIFY_DELETES_COUNT_CURR_HOUR     = 
PREFIX_NOTIFICATION + "currentHourEntityDeletes";
+    public static final String STAT_NOTIFY_FAILED_COUNT_CURR_HOUR      = 
PREFIX_NOTIFICATION + "currentHourFailed";
+    public static final String STAT_NOTIFY_START_TIME_CURR_HOUR        = 
PREFIX_NOTIFICATION + "currentHourStartTime";
+    public static final String STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME = 
PREFIX_NOTIFICATION + "lastMessageProcessedTime";
+    public static final String STAT_NOTIFY_START_OFFSET                = 
PREFIX_NOTIFICATION + "offsetStart";
+    public static final String STAT_NOTIFY_CURRENT_OFFSET              = 
PREFIX_NOTIFICATION + "offsetCurrent";
+    public static final String STAT_NOTIFY_COUNT_PREV_DAY              = 
PREFIX_NOTIFICATION + "previousDay";
+    public static final String STAT_NOTIFY_AVG_TIME_PREV_DAY           = 
PREFIX_NOTIFICATION + "previousDayAvgTime";
+    public static final String STAT_NOTIFY_CREATES_COUNT_PREV_DAY      = 
PREFIX_NOTIFICATION + "previousDayEntityCreates";
+    public static final String STAT_NOTIFY_UPDATES_COUNT_PREV_DAY      = 
PREFIX_NOTIFICATION + "previousDayEntityUpdates";
+    public static final String STAT_NOTIFY_DELETES_COUNT_PREV_DAY      = 
PREFIX_NOTIFICATION + "previousDayEntityDeletes";
+    public static final String STAT_NOTIFY_FAILED_COUNT_PREV_DAY       = 
PREFIX_NOTIFICATION + "previousDayFailed";
+    public static final String STAT_NOTIFY_COUNT_PREV_HOUR             = 
PREFIX_NOTIFICATION + "previousHour";
+    public static final String STAT_NOTIFY_AVG_TIME_PREV_HOUR          = 
PREFIX_NOTIFICATION + "previousHourAvgTime";
+    public static final String STAT_NOTIFY_CREATES_COUNT_PREV_HOUR     = 
PREFIX_NOTIFICATION + "previousHourEntityCreates";
+    public static final String STAT_NOTIFY_UPDATES_COUNT_PREV_HOUR     = 
PREFIX_NOTIFICATION + "previousHourEntityUpdates";
+    public static final String STAT_NOTIFY_DELETES_COUNT_PREV_HOUR     = 
PREFIX_NOTIFICATION + "previousHourEntityDeletes";
+    public static final String STAT_NOTIFY_FAILED_COUNT_PREV_HOUR      = 
PREFIX_NOTIFICATION + "previousHourFailed";
+    public static final String STAT_NOTIFY_COUNT_TOTAL                 = 
PREFIX_NOTIFICATION + "total";
+    public static final String STAT_NOTIFY_AVG_TIME_TOTAL              = 
PREFIX_NOTIFICATION + "totalAvgTime";
+    public static final String STAT_NOTIFY_CREATES_COUNT_TOTAL         = 
PREFIX_NOTIFICATION + "totalCreates";
+    public static final String STAT_NOTIFY_UPDATES_COUNT_TOTAL         = 
PREFIX_NOTIFICATION + "totalUpdates";
+    public static final String STAT_NOTIFY_DELETES_COUNT_TOTAL         = 
PREFIX_NOTIFICATION + "totalDeletes";
+    public static final String STAT_NOTIFY_FAILED_COUNT_TOTAL          = 
PREFIX_NOTIFICATION + "totalFailed";
+    public static final String STAT_SERVER_ACTIVE_TIMESTAMP            = 
PREFIX_SERVER + "activeTimeStamp";
+    public static final String STAT_SERVER_START_TIMESTAMP             = 
PREFIX_SERVER + "startTimeStamp";
+    public static final String STAT_SERVER_UP_TIME                     = 
PREFIX_SERVER + "upTime";
+
     private Map<String, Map<String, Object>> data;
 
     public AtlasMetrics() {
@@ -63,30 +108,38 @@ public class AtlasMetrics {
     @JsonIgnore
     public void addMetric(String groupKey, String key, Object value) {
         Map<String, Map<String, Object>> data = this.data;
+
         if (data == null) {
             data = new HashMap<>();
+
+            this.data = data;
         }
+
         Map<String, Object> metricMap = data.computeIfAbsent(groupKey, k -> 
new HashMap<>());
+
         metricMap.put(key, value);
-        setData(data);
     }
 
     @JsonIgnore
     public Number getNumericMetric(String groupKey, String key) {
         Object metric = getMetric(groupKey, key);
+
         return metric instanceof Number ? (Number) metric : null;
     }
 
     @JsonIgnore
     public Object getMetric(String groupKey, String key) {
+        Object                           ret  = null;
         Map<String, Map<String, Object>> data = this.data;
-        Object ret = null;
+
         if (data != null) {
             Map<String, Object> metricMap = data.get(groupKey);
+
             if (metricMap != null && !metricMap.isEmpty()) {
                 ret = metricMap.get(key);
             }
         }
+
         return ret;
     }
 }
diff --git 
a/repository/src/main/java/org/apache/atlas/services/MetricsService.java 
b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
index d9ea12a..8fb68e9 100644
--- a/repository/src/main/java/org/apache/atlas/services/MetricsService.java
+++ b/repository/src/main/java/org/apache/atlas/services/MetricsService.java
@@ -18,15 +18,13 @@
 package org.apache.atlas.services;
 
 import org.apache.atlas.annotation.AtlasService;
-import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.instance.AtlasEntity.Status;
 import org.apache.atlas.model.metrics.AtlasMetrics;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
 import org.apache.atlas.type.AtlasTypeRegistry;
-import org.apache.atlas.util.StatisticsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +51,8 @@ public class MetricsService {
     public static final String GENERAL = "general";
 
     // Query names
+    protected static final String METRIC_COLLECTION_TIME   = "collectionTime";
+    protected static final String METRIC_STATS             = "stats";
     protected static final String METRIC_TYPE_COUNT        = TYPE + "Count";
     protected static final String METRIC_TYPE_UNUSED_COUNT = TYPE + 
"UnusedCount";
     protected static final String METRIC_ENTITY_COUNT      = ENTITY + "Count";
@@ -61,114 +61,90 @@ public class MetricsService {
     protected static final String METRIC_TAG_COUNT         = TAG + "Count";
     protected static final String METRIC_ENTITIES_PER_TAG  = TAG + "Entities";
 
-    public static final String METRIC_COLLECTION_TIME                = 
"collectionTime";
-
     private final AtlasGraph        atlasGraph;
     private final AtlasTypeRegistry typeRegistry;
-    private final StatisticsUtil    statisticsUtil;
+    private final AtlasMetricsUtil  metricsUtil;
     private final String            indexSearchPrefix = 
AtlasGraphUtilsV2.getIndexSearchPrefix();
 
     @Inject
-    public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry 
typeRegistry, StatisticsUtil statisticsUtil) {
+    public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry 
typeRegistry, AtlasMetricsUtil metricsUtil) {
         this.atlasGraph   = graph;
         this.typeRegistry = typeRegistry;
-        this.statisticsUtil = statisticsUtil;
+        this.metricsUtil  = metricsUtil;
     }
 
     @SuppressWarnings("unchecked")
     public AtlasMetrics getMetrics() {
-        AtlasMetrics metrics = new AtlasMetrics();
-
-        metrics.addMetric(GENERAL, METRIC_TYPE_COUNT, getAllTypesCount());
-        metrics.addMetric(GENERAL, METRIC_TAG_COUNT, getAllTagsCount());
-
-        Map<String, Long> activeCountMap  = new HashMap<>();
-        Map<String, Long> deletedCountMap = new HashMap<>();
-
-        // metrics for classifications
+        Collection<String> entityDefNames         = 
typeRegistry.getAllEntityDefNames();
         Collection<String> classificationDefNames = 
typeRegistry.getAllClassificationDefNames();
-
-        if (classificationDefNames != null) {
-            for (String classificationDefName : classificationDefNames) {
-                activeCountMap.put(classificationDefName, 
getTypeCount(classificationDefName, ACTIVE));
-            }
-        }
-
-        // metrics for entities
-        Collection<String> entityDefNames = 
typeRegistry.getAllEntityDefNames();
+        Map<String, Long>  activeEntityCount      = new HashMap<>();
+        Map<String, Long>  deletedEntityCount     = new HashMap<>();
+        Map<String, Long>  taggedEntityCount      = new HashMap<>();
+        long               unusedTypeCount        = 0;
+        long               totalEntities          = 0;
 
         if (entityDefNames != null) {
             for (String entityDefName : entityDefNames) {
-                activeCountMap.put(entityDefName, getTypeCount(entityDefName, 
ACTIVE));
-                deletedCountMap.put(entityDefName, getTypeCount(entityDefName, 
DELETED));
+                long activeCount  = getTypeCount(entityDefName, ACTIVE);
+                long deletedCount = getTypeCount(entityDefName, DELETED);
+
+                if (activeCount > 0) {
+                    activeEntityCount.put(entityDefName, activeCount);
+                    totalEntities += activeCount;
+                }
+
+                if (deletedCount > 0) {
+                    deletedEntityCount.put(entityDefName, deletedCount);
+                    totalEntities += deletedCount;
+                }
+
+                if (activeCount == 0 && deletedCount == 0) {
+                    unusedTypeCount++;
+                }
             }
         }
 
-        Map<String, Long> activeEntityCount  = new HashMap<>();
-        Map<String, Long> deletedEntityCount = new HashMap<>();
-        long              unusedTypeCount    = 0;
-        long              totalEntities      = 0;
-
-        for (String entityDefName : typeRegistry.getAllEntityDefNames()) {
-            Long activeCount  = activeCountMap.get(entityDefName);
-            Long deletedCount = deletedCountMap.get(entityDefName);
-
-            if (activeCount > 0) {
-                activeEntityCount.put(entityDefName, activeCount);
-                totalEntities += activeCount.longValue();
-            }
-
-            if (deletedCount > 0) {
-                deletedEntityCount.put(entityDefName, deletedCount);
-                totalEntities += deletedCount.longValue();
-            }
+        if (classificationDefNames != null) {
+            for (String classificationDefName : classificationDefNames) {
+                long count = getTypeCount(classificationDefName, ACTIVE);
 
-            if (activeCount == 0 && deletedCount == 0) {
-                unusedTypeCount++;
+                if (count > 0) {
+                    taggedEntityCount.put(classificationDefName, count);
+                }
             }
         }
 
+        AtlasMetrics metrics = new AtlasMetrics();
+
+        metrics.addMetric(GENERAL, METRIC_COLLECTION_TIME, 
System.currentTimeMillis());
+        metrics.addMetric(GENERAL, METRIC_STATS, metricsUtil.getStats()); 
//add atlas server stats
+        metrics.addMetric(GENERAL, METRIC_TYPE_COUNT, getAllTypesCount());
+        metrics.addMetric(GENERAL, METRIC_TAG_COUNT, getAllTagsCount());
         metrics.addMetric(GENERAL, METRIC_TYPE_UNUSED_COUNT, unusedTypeCount);
         metrics.addMetric(GENERAL, METRIC_ENTITY_COUNT, totalEntities);
+
         metrics.addMetric(ENTITY, METRIC_ENTITY_ACTIVE, activeEntityCount);
         metrics.addMetric(ENTITY, METRIC_ENTITY_DELETED, deletedEntityCount);
 
-        Map<String, Long> taggedEntityCount = new HashMap<>();
-
-        for (String classificationName : 
typeRegistry.getAllClassificationDefNames()) {
-            Long count = activeCountMap.get(classificationName);
-
-            if (count > 0) {
-                taggedEntityCount.put(classificationName, count);
-            }
-        }
-
         metrics.addMetric(TAG, METRIC_ENTITIES_PER_TAG, taggedEntityCount);
 
-        // Miscellaneous metrics
-        long collectionTime = System.currentTimeMillis();
-
-        metrics.addMetric(GENERAL, METRIC_COLLECTION_TIME, collectionTime);
-
-        //add atlas server stats
-        Map<String, Object> statistics = statisticsUtil.getAtlasStatistics();
-        metrics.addMetric(GENERAL, "stats", statistics);
-
         return metrics;
     }
 
-    private Long getTypeCount(String typeName, Status status) {
+    private long getTypeCount(String typeName, Status status) {
+        Long   ret        = null;
         String indexQuery = indexSearchPrefix + "\"" + 
ENTITY_TYPE_PROPERTY_KEY + "\" : (%s)" + AND_STR +
                             indexSearchPrefix + "\"" + STATE_PROPERTY_KEY      
 + "\" : (%s)";
 
         indexQuery = String.format(indexQuery, typeName, status.name());
 
         try {
-            return atlasGraph.indexQuery(VERTEX_INDEX, 
indexQuery).vertexTotals();
+            ret = atlasGraph.indexQuery(VERTEX_INDEX, 
indexQuery).vertexTotals();
         }catch (Exception e){
             LOG.error("Failed fetching using indexQuery: " + e.getMessage());
         }
-        return 0l;
+
+        return ret == null ? 0L : ret;
     }
 
     private int getAllTypesCount() {
diff --git 
a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java 
b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java
new file mode 100644
index 0000000..acf9e34
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.util;
+
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneOffset;
+
+import static org.apache.atlas.util.AtlasMetricsCounter.Period.*;
+
+public class AtlasMetricsCounter {
+    public enum Period { ALL, CURR_DAY, CURR_HOUR, PREV_HOUR, PREV_DAY };
+
+    private final String  name;
+    private final Stats   stats;
+    private       Clock   clock;
+    private       Instant lastIncrTime;
+    private       Instant dayStartTime;
+    private       Instant dayEndTime;
+    private       Instant hourStartTime;
+    private       Instant hourEndTime;
+
+    public AtlasMetricsCounter(String name) {
+        this(name, Clock.systemUTC());
+    }
+
+    public AtlasMetricsCounter(String name, Clock clock) {
+        this.name  = name;
+        this.stats = new Stats();
+
+        init(clock);
+    }
+
+    public String getName() { return name; }
+
+    public Instant getLastIncrTime() { return lastIncrTime; }
+
+    public void incr() {
+        incrByWithMeasure(1, 0);
+    }
+
+    public void incrBy(long count) {
+        incrByWithMeasure(count, 0);
+    }
+
+    public void incrWithMeasure(long measure) {
+        incrByWithMeasure(1, measure);
+    }
+
+    public void incrByWithMeasure(long count, long measure) {
+        Instant instant  = clock.instant();
+
+        stats.addCount(ALL, count);
+        stats.addMeasure(ALL, measure);
+
+        if (instant.isAfter(dayStartTime)) { // ignore times earlier than 
start of current day
+            lastIncrTime = instant;
+
+            updateForTime(instant);
+
+            stats.addCount(CURR_DAY, count);
+            stats.addMeasure(CURR_DAY, measure);
+
+            if (instant.isAfter(hourStartTime)) { // ignore times earlier than 
start of current hour
+                stats.addCount(CURR_HOUR, count);
+                stats.addMeasure(CURR_HOUR, measure);
+            }
+        }
+    }
+
+    public Stats report() {
+        updateForTime(clock.instant());
+
+        return new Stats(stats, dayStartTime.toEpochMilli(), 
hourStartTime.toEpochMilli());
+    }
+
+    // visible only for testing
+    void init(Clock clock) {
+        this.clock         = clock;
+        this.lastIncrTime  = Instant.ofEpochSecond(0);
+        this.dayStartTime  = Instant.ofEpochSecond(0);
+        this.dayEndTime    = Instant.ofEpochSecond(0);
+        this.hourStartTime = Instant.ofEpochSecond(0);
+        this.hourEndTime   = Instant.ofEpochSecond(0);
+
+        updateForTime(clock.instant());
+    }
+
+    protected void updateForTime(Instant instant) {
+        if (instant.isAfter(dayEndTime)) {
+            rolloverDay(instant);
+            rolloverHour(instant);
+        } else if (instant.isAfter(hourEndTime)) {
+            rolloverHour(instant);
+        }
+    }
+
+    protected void rolloverDay(Instant instant) {
+        Instant dayStartTime = getDayStartTime(instant);
+
+        if (dayStartTime.equals(dayEndTime)) {
+            stats.copy(CURR_DAY, PREV_DAY);
+        } else {
+            stats.reset(PREV_DAY);
+        }
+
+        stats.reset(CURR_DAY);
+
+        this.dayStartTime = dayStartTime;
+        this.dayEndTime   = getNextDayStartTime(instant);
+    }
+
+    protected void rolloverHour(Instant instant) {
+        Instant hourStartTime = getHourStartTime(instant);
+
+        if (hourStartTime.equals(hourEndTime)) {
+            stats.copy(CURR_HOUR, PREV_HOUR);
+        } else {
+            stats.reset(PREV_HOUR);
+        }
+
+        stats.reset(CURR_HOUR);
+
+        this.hourStartTime = hourStartTime;
+        this.hourEndTime   = getNextHourStartTime(instant);
+    }
+
+    public static LocalDateTime getLocalDateTime(Instant instant) {
+        return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
+    }
+
+    public static Instant getHourStartTime(Instant instant) {
+        LocalDateTime time = getLocalDateTime(instant);
+
+        return LocalDateTime.of(time.toLocalDate(), 
LocalTime.MIN).plusHours(time.getHour()).toInstant(ZoneOffset.UTC);
+    }
+
+    public static Instant getNextHourStartTime(Instant instant) {
+        LocalDateTime time = getLocalDateTime(instant);
+
+        return LocalDateTime.of(time.toLocalDate(), 
LocalTime.MIN).plusHours(time.getHour() + 1).toInstant(ZoneOffset.UTC);
+    }
+
+    public static Instant getDayStartTime(Instant instant) {
+        LocalDateTime time = getLocalDateTime(instant);
+
+        return LocalDateTime.of(time.toLocalDate(), 
LocalTime.MIN).toInstant(ZoneOffset.UTC);
+    }
+
+    public static Instant getNextDayStartTime(Instant instant) {
+        LocalDateTime time = getLocalDateTime(instant);
+
+        return LocalDateTime.of(time.toLocalDate().plusDays(1), 
LocalTime.MIN).toInstant(ZoneOffset.UTC);
+    }
+
+
+    public static class Stats {
+        private static final int NUM_PERIOD = Period.values().length;
+
+        private final long   dayStartTimeMs;
+        private final long   hourStartTimeMs;
+        private final long[] count           = new long[NUM_PERIOD];
+        private final long[] measureSum      = new long[NUM_PERIOD];
+        private final long[] measureMin      = new long[NUM_PERIOD];
+        private final long[] measureMax      = new long[NUM_PERIOD];
+
+
+        public Stats() {
+            dayStartTimeMs  = 0;
+            hourStartTimeMs = 0;
+
+            for (Period period : Period.values()) {
+                reset(period);
+            }
+        }
+
+        public Stats(Stats other, long dayStartTimeMs, long hourStartTimeMs) {
+            this.dayStartTimeMs  = dayStartTimeMs;
+            this.hourStartTimeMs = hourStartTimeMs;
+
+            copy(other.count, this.count);
+            copy(other.measureSum, this.measureSum);
+            copy(other.measureMin, this.measureMin);
+            copy(other.measureMax, this.measureMax);
+        }
+
+        public long getDayStartTimeMs() { return dayStartTimeMs; }
+
+        public long getHourStartTimeMs() { return hourStartTimeMs; }
+
+        public long getCount(Period period) { return count[period.ordinal()]; }
+
+        public long getMeasureSum(Period period) { return 
measureSum[period.ordinal()]; }
+
+        public long getMeasureMin(Period period) { return 
measureMin[period.ordinal()]; }
+
+        public long getMeasureMax(Period period) { return 
measureMax[period.ordinal()]; }
+
+        public long getMeasureAvg(Period period) {
+            int  idx = period.ordinal();
+            long c   = count[idx];
+
+            return c != 0 ? (measureSum[idx] / c) : 0;
+        }
+
+        public void addCount(Period period, long num) {
+            count[period.ordinal()] += num;
+        }
+
+        public void addMeasure(Period period, long measure) {
+            int idx = period.ordinal();
+
+            measureSum[idx] += measure;
+
+            if (measureMin[idx] > measure) {
+                measureMin[idx] = measure;
+            }
+
+            if (measureMax[idx] < measure) {
+                measureMax[idx] = measure;
+            }
+        }
+
+        private void copy(Period src, Period dest) {
+            int srcIdx  = src.ordinal();
+            int destIdx = dest.ordinal();
+
+            count[destIdx]      = count[srcIdx];
+            measureSum[destIdx] = measureSum[srcIdx];
+            measureMin[destIdx] = measureMin[srcIdx];
+            measureMax[destIdx] = measureMax[srcIdx];
+        }
+
+        private void reset(Period period) {
+            int idx = period.ordinal();
+
+            count[idx]      = 0;
+            measureSum[idx] = 0;
+            measureMin[idx] = Long.MAX_VALUE;
+            measureMax[idx] = Long.MIN_VALUE;
+        }
+
+        private void copy(long[] src, long[] dest) {
+            for (int i = 0; i < dest.length; i++) {
+                dest[i] = src[i];
+            }
+        }
+    }
+}
diff --git 
a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java 
b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
new file mode 100644
index 0000000..c41e6bd
--- /dev/null
+++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.util;
+
+import org.apache.atlas.model.instance.EntityMutationResponse;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
+import org.apache.atlas.util.AtlasMetricsCounter.Stats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.springframework.stereotype.Component;
+
+import javax.inject.Inject;
+import java.time.Clock;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.apache.atlas.model.metrics.AtlasMetrics.*;
+import static org.apache.atlas.repository.Constants.TYPE_NAME_INTERNAL;
+import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY;
+import static org.apache.atlas.util.AtlasMetricsCounter.Period.*;
+
+@Component
+public class AtlasMetricsUtil {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AtlasMetricsUtil.class);
+
+    private static final long   SEC_MS               = 1000;
+    private static final long   MIN_MS               =   60 * SEC_MS;
+    private static final long   HOUR_MS              =   60 * MIN_MS;
+    private static final long   DAY_MS               =   24 * HOUR_MS;
+    private static final String STATUS_CONNECTED     = "connected";
+    private static final String STATUS_NOT_CONNECTED = "not-connected";
+
+    private final AtlasGraph          graph;
+    private       long                serverStartTime   = 0;
+    private       long                serverActiveTime  = 0;
+    private       long                msgOffsetStart    = -1;
+    private       long                msgOffsetCurrent  = 0;
+    private final AtlasMetricsCounter messagesProcessed = new 
AtlasMetricsCounter("messagesProcessed");
+    private final AtlasMetricsCounter messagesFailed    = new 
AtlasMetricsCounter("messagesFailed");
+    private final AtlasMetricsCounter entityCreates     = new 
AtlasMetricsCounter("entityCreates");
+    private final AtlasMetricsCounter entityUpdates     = new 
AtlasMetricsCounter("entityUpdates");
+    private final AtlasMetricsCounter entityDeletes     = new 
AtlasMetricsCounter("entityDeletes");
+
+    @Inject
+    public AtlasMetricsUtil(AtlasGraph graph) {
+        this.graph = graph;
+    }
+
+    // visible only for testing
+    public void init(Clock clock) {
+        messagesProcessed.init(clock);
+        messagesFailed.init(clock);
+        entityCreates.init(clock);
+        entityUpdates.init(clock);
+        entityDeletes.init(clock);
+    }
+
+    public void onServerStart() {
+        serverStartTime = System.currentTimeMillis();
+    }
+
+    public void onServerActivation() {
+        serverActiveTime = System.currentTimeMillis();
+    }
+
+    public void onNotificationProcessingComplete(long msgOffset, 
NotificationStat stats) {
+        messagesProcessed.incrWithMeasure(stats.timeTakenMs);
+        entityCreates.incrBy(stats.entityCreates);
+        entityUpdates.incrBy(stats.entityUpdates);
+        entityDeletes.incrBy(stats.entityDeletes);
+
+        if (stats.isFailedMsg) {
+            messagesFailed.incr();
+        }
+
+        if (msgOffsetStart == -1) {
+            msgOffsetStart = msgOffset;
+        }
+
+        msgOffsetCurrent = ++msgOffset;
+    }
+
+    public Map<String, Object> getStats() {
+        Map<String, Object> ret = new HashMap<>();
+
+        Stats messagesProcessed = this.messagesProcessed.report();
+        Stats messagesFailed    = this.messagesFailed.report();
+        Stats entityCreates     = this.entityCreates.report();
+        Stats entityUpdates     = this.entityUpdates.report();
+        Stats entityDeletes     = this.entityDeletes.report();
+
+        ret.put(STAT_SERVER_START_TIMESTAMP, serverStartTime);
+        ret.put(STAT_SERVER_ACTIVE_TIMESTAMP, serverActiveTime);
+        ret.put(STAT_SERVER_UP_TIME, 
millisToTimeDiff(System.currentTimeMillis() - serverStartTime));
+        ret.put(STAT_CONNECTION_STATUS_BACKEND_STORE, getHBaseStatus() ? 
STATUS_CONNECTED : STATUS_NOT_CONNECTED);
+        ret.put(STAT_CONNECTION_STATUS_INDEX_STORE, getSolrStatus() ? 
STATUS_CONNECTED : STATUS_NOT_CONNECTED);
+        ret.put(STAT_NOTIFY_START_OFFSET, msgOffsetStart);
+        ret.put(STAT_NOTIFY_CURRENT_OFFSET, msgOffsetCurrent);
+        ret.put(STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME, 
this.messagesProcessed.getLastIncrTime().toEpochMilli());
+
+        ret.put(STAT_NOTIFY_COUNT_TOTAL,         
messagesProcessed.getCount(ALL));
+        ret.put(STAT_NOTIFY_AVG_TIME_TOTAL,      
messagesProcessed.getMeasureAvg(ALL));
+        ret.put(STAT_NOTIFY_FAILED_COUNT_TOTAL,  messagesFailed.getCount(ALL));
+        ret.put(STAT_NOTIFY_CREATES_COUNT_TOTAL, entityCreates.getCount(ALL));
+        ret.put(STAT_NOTIFY_UPDATES_COUNT_TOTAL, entityUpdates.getCount(ALL));
+        ret.put(STAT_NOTIFY_DELETES_COUNT_TOTAL, entityDeletes.getCount(ALL));
+
+        ret.put(STAT_NOTIFY_START_TIME_CURR_DAY,    
messagesProcessed.getDayStartTimeMs());
+        ret.put(STAT_NOTIFY_COUNT_CURR_DAY,         
messagesProcessed.getCount(CURR_DAY));
+        ret.put(STAT_NOTIFY_AVG_TIME_CURR_DAY,      
messagesProcessed.getMeasureAvg(CURR_DAY));
+        ret.put(STAT_NOTIFY_FAILED_COUNT_CURR_DAY,  
messagesFailed.getCount(CURR_DAY));
+        ret.put(STAT_NOTIFY_CREATES_COUNT_CURR_DAY, 
entityCreates.getCount(CURR_DAY));
+        ret.put(STAT_NOTIFY_UPDATES_COUNT_CURR_DAY, 
entityUpdates.getCount(CURR_DAY));
+        ret.put(STAT_NOTIFY_DELETES_COUNT_CURR_DAY, 
entityDeletes.getCount(CURR_DAY));
+
+        ret.put(STAT_NOTIFY_START_TIME_CURR_HOUR,    
messagesProcessed.getHourStartTimeMs());
+        ret.put(STAT_NOTIFY_COUNT_CURR_HOUR,         
messagesProcessed.getCount(CURR_HOUR));
+        ret.put(STAT_NOTIFY_AVG_TIME_CURR_HOUR,      
messagesProcessed.getMeasureAvg(CURR_HOUR));
+        ret.put(STAT_NOTIFY_FAILED_COUNT_CURR_HOUR,  
messagesFailed.getCount(CURR_HOUR));
+        ret.put(STAT_NOTIFY_CREATES_COUNT_CURR_HOUR, 
entityCreates.getCount(CURR_HOUR));
+        ret.put(STAT_NOTIFY_UPDATES_COUNT_CURR_HOUR, 
entityUpdates.getCount(CURR_HOUR));
+        ret.put(STAT_NOTIFY_DELETES_COUNT_CURR_HOUR, 
entityDeletes.getCount(CURR_HOUR));
+
+        ret.put(STAT_NOTIFY_COUNT_PREV_HOUR,         
messagesProcessed.getCount(PREV_HOUR));
+        ret.put(STAT_NOTIFY_AVG_TIME_PREV_HOUR,      
messagesProcessed.getMeasureAvg(PREV_HOUR));
+        ret.put(STAT_NOTIFY_FAILED_COUNT_PREV_HOUR,  
messagesFailed.getCount(PREV_HOUR));
+        ret.put(STAT_NOTIFY_CREATES_COUNT_PREV_HOUR, 
entityCreates.getCount(PREV_HOUR));
+        ret.put(STAT_NOTIFY_UPDATES_COUNT_PREV_HOUR, 
entityUpdates.getCount(PREV_HOUR));
+        ret.put(STAT_NOTIFY_DELETES_COUNT_PREV_HOUR, 
entityDeletes.getCount(PREV_HOUR));
+
+        ret.put(STAT_NOTIFY_COUNT_PREV_DAY,         
messagesProcessed.getCount(PREV_DAY));
+        ret.put(STAT_NOTIFY_AVG_TIME_PREV_DAY,      
messagesProcessed.getMeasureAvg(PREV_DAY));
+        ret.put(STAT_NOTIFY_FAILED_COUNT_PREV_DAY,  
messagesFailed.getCount(PREV_DAY));
+        ret.put(STAT_NOTIFY_CREATES_COUNT_PREV_DAY, 
entityCreates.getCount(PREV_DAY));
+        ret.put(STAT_NOTIFY_UPDATES_COUNT_PREV_DAY, 
entityUpdates.getCount(PREV_DAY));
+        ret.put(STAT_NOTIFY_DELETES_COUNT_PREV_DAY, 
entityDeletes.getCount(PREV_DAY));
+
+        return ret;
+    }
+
+    private boolean getHBaseStatus(){
+        try {
+            runWithTimeout(new Runnable() {
+                @Override
+                public void run() {
+                    graph.query().has(TYPE_NAME_PROPERTY_KEY, 
TYPE_NAME_INTERNAL).vertices(1);
+                }
+            }, 10, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            LOG.error(e.getMessage());
+            return false;
+        }
+
+        return true;
+    }
+
+    private boolean getSolrStatus(){
+        final String query = AtlasGraphUtilsV2.getIndexSearchPrefix() + "\"" + 
Constants.TYPE_NAME_PROPERTY_KEY + "\":(" + TYPE_NAME_INTERNAL + ")";
+
+        try {
+            runWithTimeout(new Runnable() {
+                @Override
+                public void run() {
+                    graph.indexQuery(Constants.VERTEX_INDEX, 
query).vertices(0, 1);
+                }
+            }, 10, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            LOG.error(e.getMessage());
+            return false;
+        }
+
+        return true;
+    }
+
+    private void runWithTimeout(final Runnable runnable, long timeout, 
TimeUnit timeUnit) throws Exception {
+        runWithTimeout(new Callable<Object>() {
+            @Override
+            public Object call() {
+                runnable.run();
+                return null;
+            }
+        }, timeout, timeUnit);
+    }
+
+    private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit 
timeUnit) throws Exception {
+        final ExecutorService executor = Executors.newSingleThreadExecutor();
+        final Future<T>       future   = executor.submit(callable);
+
+        executor.shutdown();
+
+        try {
+            return future.get(timeout, timeUnit);
+        } catch (TimeoutException e) {
+            future.cancel(true);
+
+            throw e;
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+
+            if (t instanceof Error) {
+                throw (Error) t;
+            } else if (t instanceof Exception) {
+                throw (Exception) t;
+            } else {
+                throw new IllegalStateException(t);
+            }
+        }
+    }
+
+    private String millisToTimeDiff(long msDiff) {
+        StringBuilder sb = new StringBuilder();
+
+        long diffSeconds = msDiff / SEC_MS % 60;
+        long diffMinutes = msDiff / MIN_MS % 60;
+        long diffHours   = msDiff / HOUR_MS % 24;
+        long diffDays    = msDiff / DAY_MS;
+
+        if (diffDays > 0) sb.append(diffDays).append(" day ");
+        if (diffHours > 0) sb.append(diffHours).append(" hour ");
+        if (diffMinutes > 0) sb.append(diffMinutes).append(" min ");
+        if (diffSeconds > 0) sb.append(diffSeconds).append(" sec");
+
+        return sb.toString();
+    }
+
+    public static class NotificationStat {
+        public boolean isFailedMsg   = false;
+        public long    timeTakenMs   = 0;
+        public int     entityCreates = 0;
+        public int     entityUpdates = 0;
+        public int     entityDeletes = 0;
+
+        public NotificationStat() { }
+
+        public NotificationStat(boolean isFailedMsg, long timeTakenMs) {
+            this.isFailedMsg = isFailedMsg;
+            this.timeTakenMs = timeTakenMs;
+        }
+
+        public void updateStats(EntityMutationResponse response) {
+            entityCreates += getSize(response.getCreatedEntities());
+            entityUpdates += getSize(response.getUpdatedEntities());
+            entityUpdates += getSize(response.getPartialUpdatedEntities());
+            entityDeletes += getSize(response.getDeletedEntities());
+        }
+
+        private int getSize(Collection collection) {
+            return collection != null ? collection.size() : 0;
+        }
+    }
+}
diff --git a/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java 
b/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java
deleted file mode 100644
index efb804b..0000000
--- a/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.atlas.util;
-
-import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.model.AtlasStatistics;
-import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
-import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.springframework.stereotype.Component;
-
-import javax.inject.Inject;
-import java.text.NumberFormat;
-import java.text.SimpleDateFormat;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Locale;
-import java.util.concurrent.*;
-
-import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_START_TS;
-import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_ACTIVE_TS;
-import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_UP_SINCE;
-import static org.apache.atlas.model.AtlasStatistics.STAT_START_OFFSET;
-import static org.apache.atlas.model.AtlasStatistics.STAT_CURRENT_OFFSET;
-import static org.apache.atlas.model.AtlasStatistics.STAT_SOLR_STATUS;
-import static org.apache.atlas.model.AtlasStatistics.STAT_HBASE_STATUS;
-import static 
org.apache.atlas.model.AtlasStatistics.STAT_LAST_MESSAGE_PROCESSED_TIME_TS;
-import static 
org.apache.atlas.model.AtlasStatistics.STAT_AVG_MESSAGE_PROCESSING_TIME;
-import static org.apache.atlas.model.AtlasStatistics.STAT_MESSAGES_CONSUMED;
-
-@Component
-public class StatisticsUtil {
-    private static final Logger LOG = 
LoggerFactory.getLogger(StatisticsUtil.class);
-
-    private static final SimpleDateFormat simpleDateFormat = new 
SimpleDateFormat("d MMM, yyyy : hh:mm aaa z");
-
-    private static final long DAY = 1000 * 60 * 60 * 24;
-    private static final long HOUR = 1000 * 60 * 60;
-    private static final long MIN = 1000 * 60;
-    private static final long SEC = 1000;
-
-    private final AtlasGraph graph;
-    private final String STATUS_CONNECTED = "connected";
-    private final String STATUS_NOT_CONNECTED = "not-connected";
-    private final AtlasStatistics atlasStatistics;
-
-    private long countMsgProcessed        = 0;
-    private long totalMsgProcessingTimeMs = 0;
-    private Locale locale                 = new Locale("en", "US");
-    private NumberFormat numberFormat;
-
-    @Inject
-    public StatisticsUtil(AtlasGraph graph) {
-        this.graph = graph;
-        this.atlasStatistics = new AtlasStatistics();
-        numberFormat = NumberFormat.getInstance(locale);
-    }
-
-    public Map<String, Object> getAtlasStatistics() {
-        Map<String, Object> statisticsMap = new HashMap<>();
-        statisticsMap.putAll(atlasStatistics.getData());
-
-        statisticsMap.put(STAT_HBASE_STATUS, getHBaseStatus());
-        statisticsMap.put(STAT_SOLR_STATUS, getSolrStatus());
-        statisticsMap.put(STAT_SERVER_UP_SINCE, getUpSinceTime());
-        if(countMsgProcessed > 0) {
-            statisticsMap.put(STAT_MESSAGES_CONSUMED, countMsgProcessed);
-        }
-        formatStatistics(statisticsMap);
-
-        return statisticsMap;
-    }
-
-    public void setKafkaOffsets(long value) {
-        if (Long.parseLong(getStat(STAT_START_OFFSET).toString()) == -1) {
-            addStat(STAT_START_OFFSET, value);
-        }
-        addStat(STAT_CURRENT_OFFSET, ++value);
-    }
-
-    public void setAvgMsgProcessingTime(long value) {
-        countMsgProcessed++;
-        totalMsgProcessingTimeMs += value;
-        value = totalMsgProcessingTimeMs / countMsgProcessed;
-
-        addStat(STAT_AVG_MESSAGE_PROCESSING_TIME, value);
-    }
-
-    public void setLastMsgProcessedTime() {
-        addStat(STAT_LAST_MESSAGE_PROCESSED_TIME_TS, 
System.currentTimeMillis());
-    }
-
-    public void setServerStartTime() {
-        addStat(STAT_SERVER_START_TS, System.currentTimeMillis());
-    }
-
-    public void setServerActiveTime() {
-        addStat(STAT_SERVER_ACTIVE_TS, System.currentTimeMillis());
-    }
-
-
-    private void addStat(String key, Object value) {
-        Map<String, Object> data = atlasStatistics.getData();
-        if (data == null) {
-            data = new HashMap<>();
-        }
-        data.put(key, value);
-        atlasStatistics.setData(data);
-    }
-
-    private Object getStat(String key) {
-        Map<String, Object> data = atlasStatistics.getData();
-        Object ret = data.get(key);
-        if (ret == null) {
-            return -1;
-        }
-        return ret;
-    }
-
-    private void formatStatistics(Map<String, Object> statisticsMap) {
-        for (Map.Entry<String, Object> stat : statisticsMap.entrySet()) {
-            switch (stat.getKey()) {
-                case STAT_SERVER_UP_SINCE:
-                    statisticsMap.put(stat.getKey(), 
millisToTimeDiff(Long.parseLong(stat.getValue().toString())));
-                    break;
-
-                case STAT_LAST_MESSAGE_PROCESSED_TIME_TS:
-                    statisticsMap.put(stat.getKey(), 
millisToTimeStamp(Long.parseLong(stat.getValue().toString())));
-                    break;
-
-                case STAT_SERVER_START_TS:
-                case STAT_SERVER_ACTIVE_TS:
-                    statisticsMap.put(stat.getKey(), 
millisToTimeStamp(Long.parseLong(stat.getValue().toString())));
-                    break;
-
-                case STAT_AVG_MESSAGE_PROCESSING_TIME:
-                    statisticsMap.put(stat.getKey(), 
formatNumber(Long.parseLong(stat.getValue().toString())) + " milliseconds");
-                    break;
-
-                case STAT_HBASE_STATUS:
-                case STAT_SOLR_STATUS:
-                    String curState = ((boolean) stat.getValue()) ? 
STATUS_CONNECTED : STATUS_NOT_CONNECTED;
-                    statisticsMap.put(stat.getKey(), curState);
-                    break;
-
-                case STAT_MESSAGES_CONSUMED:
-                case STAT_START_OFFSET:
-                case STAT_CURRENT_OFFSET:
-                    statisticsMap.put(stat.getKey(), 
formatNumber(Long.parseLong(stat.getValue().toString())));
-                    break;
-
-                default:
-                    statisticsMap.put(stat.getKey(), stat.getValue());
-            }
-        }
-    }
-
-    private boolean getHBaseStatus() {
-
-        String query = "g.V().next()";
-        try {
-            runWithTimeout(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        graph.executeGremlinScript(query, false);
-                    } catch (AtlasBaseException e) {
-                        LOG.error(e.getMessage());
-                    }
-                }
-            }, 10, TimeUnit.SECONDS);
-        } catch (Exception e) {
-            LOG.error(e.getMessage());
-            return false;
-        }
-
-        return true;
-    }
-
-    private boolean getSolrStatus() {
-        String query = AtlasGraphUtilsV2.getIndexSearchPrefix() + "\"" + 
"__type.name\"" + " : (*)";
-        try {
-            runWithTimeout(new Runnable() {
-                @Override
-                public void run() {
-                    graph.indexQuery(Constants.VERTEX_INDEX, 
query).vertexTotals();
-                }
-            }, 10, TimeUnit.SECONDS);
-        } catch (Exception e) {
-            LOG.error(e.getMessage());
-            return false;
-        }
-        return true;
-    }
-
-    private void runWithTimeout(final Runnable runnable, long timeout, 
TimeUnit timeUnit) throws Exception {
-        runWithTimeout(new Callable<Object>() {
-            @Override
-            public Object call() {
-                runnable.run();
-                return null;
-            }
-        }, timeout, timeUnit);
-    }
-
-    private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit 
timeUnit) throws Exception {
-        final ExecutorService executor = Executors.newSingleThreadExecutor();
-        final Future<T> future = executor.submit(callable);
-        executor.shutdown();
-        try {
-            return future.get(timeout, timeUnit);
-        } catch (TimeoutException e) {
-            future.cancel(true);
-            throw e;
-        } catch (ExecutionException e) {
-            Throwable t = e.getCause();
-            if (t instanceof Error) {
-                throw (Error) t;
-            } else if (t instanceof Exception) {
-                throw (Exception) t;
-            } else {
-                throw new IllegalStateException(t);
-            }
-        }
-    }
-
-    private long getUpSinceTime() {
-        long upTS = Long.parseLong(getStat(STAT_SERVER_START_TS).toString());
-        return System.currentTimeMillis() - upTS;
-    }
-
-    private String millisToTimeDiff(long msDiff) {
-        StringBuilder sb = new StringBuilder();
-
-        long diffSeconds = msDiff / SEC % 60;
-        long diffMinutes = msDiff / MIN % 60;
-        long diffHours = msDiff / HOUR % 24;
-        long diffDays = msDiff / DAY;
-
-        if (diffDays > 0) sb.append(diffDays).append(" day ");
-        if (diffHours > 0) sb.append(diffHours).append(" hour ");
-        if (diffMinutes > 0) sb.append(diffMinutes).append(" min ");
-        if (diffSeconds > 0) sb.append(diffSeconds).append(" sec");
-
-        return sb.toString();
-    }
-
-    private String millisToTimeStamp(long ms) {
-        return simpleDateFormat.format(ms);
-    }
-
-    private String formatNumber(long value) {
-        return numberFormat.format(value);
-    }
-
-}
diff --git 
a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java 
b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
index 78e5803..64698c2 100644
--- a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
+++ b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java
@@ -28,6 +28,8 @@ import org.apache.atlas.repository.impexp.ZipSource;
 import org.apache.atlas.runner.LocalSolrRunner;
 import org.apache.atlas.store.AtlasTypeDefStore;
 import org.apache.atlas.type.AtlasTypeRegistry;
+import org.apache.atlas.util.AtlasMetricsCounter;
+import org.apache.atlas.util.AtlasMetricsUtil;
 import org.testng.SkipException;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -37,10 +39,15 @@ import org.testng.annotations.Test;
 import javax.inject.Inject;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr;
+import static org.apache.atlas.model.metrics.AtlasMetrics.*;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson;
 import static 
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters;
 import static org.apache.atlas.services.MetricsService.ENTITY;
@@ -53,11 +60,11 @@ import static 
org.apache.atlas.services.MetricsService.METRIC_TAG_COUNT;
 import static org.apache.atlas.services.MetricsService.METRIC_TYPE_COUNT;
 import static 
org.apache.atlas.services.MetricsService.METRIC_TYPE_UNUSED_COUNT;
 import static org.apache.atlas.services.MetricsService.TAG;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.*;
 
 @Guice(modules = TestModules.TestOnlyModule.class)
 public class MetricsServiceTest {
+
     public static final String IMPORT_FILE = "metrics-entities-data.zip";
 
     @Inject
@@ -72,6 +79,14 @@ public class MetricsServiceTest {
     @Inject
     private MetricsService metricsService;
 
+    @Inject
+    private AtlasMetricsUtil metricsUtil;
+
+    TestClock clock = new TestClock(Clock.systemUTC(), ZoneOffset.UTC);
+
+    long msgOffset = 0;
+
+
     private final Map<String, Long> activeEntityMetricsExpected = new 
HashMap<String, Long>() {{
         put("hive_storagedesc", 5L);
         put("__ExportImportAuditEntry", 1L);
@@ -95,6 +110,17 @@ public class MetricsServiceTest {
         put("PII", 1L);
     }};
 
+    private final Map<String, Object> metricExpected = new HashMap<String, 
Object>() {{
+        put(STAT_NOTIFY_COUNT_CURR_HOUR, 11L);
+        put(STAT_NOTIFY_FAILED_COUNT_CURR_HOUR, 1L);
+        put(STAT_NOTIFY_COUNT_PREV_HOUR, 11L);
+        put(STAT_NOTIFY_FAILED_COUNT_PREV_HOUR, 1L);
+        put(STAT_NOTIFY_COUNT_CURR_DAY, 33L);
+        put(STAT_NOTIFY_FAILED_COUNT_CURR_DAY, 3L);
+        put(STAT_NOTIFY_COUNT_PREV_DAY, 11L);
+        put(STAT_NOTIFY_FAILED_COUNT_PREV_DAY, 1L);
+    }};
+
     @BeforeClass
     public void setup() {
         RequestContext.clear();
@@ -148,6 +174,24 @@ public class MetricsServiceTest {
         assertEquals(deletedEntityMetricsActual, deletedEntityMetricsExpected);
     }
 
+    @Test
+    public void testNotificationMetrics() {
+        Instant now           = Clock.systemUTC().instant();
+        Instant dayStartTime  = AtlasMetricsCounter.getDayStartTime(now);
+        Instant dayEndTime    = AtlasMetricsCounter.getNextDayStartTime(now);
+        Instant hourStartTime = dayEndTime.minusSeconds(60 * 60);
+
+        prepareNotificationData(dayStartTime, hourStartTime);
+
+        clock.setInstant(dayEndTime.minusSeconds(1));
+
+        Map<String, Object> notificationMetricMap = metricsUtil.getStats();
+
+        clock.setInstant(null);
+
+        verifyNotificationMetric(metricExpected, notificationMetricMap);
+    }
+
 
     private void loadModelFilesAndImportTestData() {
         try {
@@ -165,8 +209,75 @@ public class MetricsServiceTest {
         }
     }
 
+    private void prepareNotificationData(Instant dayStartTime, Instant 
hourStartTime) {
+        Instant prevDayStartTime = 
AtlasMetricsCounter.getDayStartTime(dayStartTime.minusSeconds(1));
+
+        msgOffset = 0;
+
+        clock.setInstant(prevDayStartTime);
+        metricsUtil.init(clock);
+        clock.setInstant(null);
+
+        processMessage(prevDayStartTime.plusSeconds(3)); // yesterday
+        processMessage(dayStartTime.plusSeconds(3));     // today
+        processMessage(hourStartTime.minusSeconds(3));   // past hour
+        processMessage(hourStartTime.plusSeconds(3));    // this hour
+    }
+
+    private void processMessage(Instant instant) {
+        clock.setInstant(instant);
+
+        metricsUtil.onNotificationProcessingComplete(++msgOffset, new 
AtlasMetricsUtil.NotificationStat(true, 1));
+
+        for (int i = 0; i < 10; i++) {
+            metricsUtil.onNotificationProcessingComplete(msgOffset++, new 
AtlasMetricsUtil.NotificationStat(false, 1));
+        }
+
+        clock.setInstant(null);
+    }
+
+    private void verifyNotificationMetric(Map<String, Object> metricExpected, 
Map<String, Object> notificationMetrics) {
+        assertNotNull(notificationMetrics);
+        assertNotEquals(notificationMetrics.size(), 0);
+        assertTrue(notificationMetrics.size() >= metricExpected.size());
+
+        for (Map.Entry<String, Object> entry : metricExpected.entrySet()) {
+            assertEquals(notificationMetrics.get(entry.getKey()), 
entry.getValue(), entry.getKey());
+        }
+    }
+
     public static ZipSource getZipSource(String fileName) throws IOException, 
AtlasBaseException {
         FileInputStream fs = 
ZipFileResourceTestUtils.getFileInputStream(fileName);
         return new ZipSource(fs);
     }
+
+    private static class TestClock extends Clock {
+        private final Clock   baseClock;
+        private final ZoneId  zone;
+        private       Instant instant = null;
+
+        public TestClock(Clock baseClock, ZoneId zone) {
+            this.baseClock = baseClock;
+            this.zone      = zone;
+        }
+
+        @Override
+        public ZoneId getZone() {
+            return zone;
+        }
+
+        @Override
+        public TestClock withZone(ZoneId zone) {
+            return new TestClock(baseClock, zone);
+        }
+
+        @Override
+        public Instant instant() {
+            return instant != null ? instant : baseClock.instant();
+        }
+
+        public void setInstant(Instant instant) {
+            this.instant = instant;
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 8430fd4..ce2d76f 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -47,7 +47,8 @@ import 
org.apache.atlas.notification.preprocessor.EntityPreprocessor;
 import org.apache.atlas.notification.preprocessor.PreprocessorContext;
 import 
org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction;
 import org.apache.atlas.utils.LruCache;
-import org.apache.atlas.util.StatisticsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil.NotificationStat;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
 import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest;
@@ -140,7 +141,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private final ServiceState                  serviceState;
     private final AtlasInstanceConverter        instanceConverter;
     private final AtlasTypeRegistry             typeRegistry;
-    private final StatisticsUtil                statisticsUtil;
+    private final AtlasMetricsUtil              metricsUtil;
     private final int                           maxRetries;
     private final int                           failedMsgCacheSize;
     private final int                           minWaitDuration;
@@ -156,10 +157,9 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private final boolean                       hiveTypesRemoveOwnedRefAttrs;
     private final boolean                       rdbmsTypesRemoveOwnedRefAttrs;
     private final boolean                       preprocessEnabled;
-
-    private NotificationInterface notificationInterface;
-    private ExecutorService       executors;
-    private Configuration         applicationProperties;
+    private final NotificationInterface         notificationInterface;
+    private final Configuration                 applicationProperties;
+    private       ExecutorService               executors;
 
     @VisibleForTesting
     final int consumerRetryInterval;
@@ -170,14 +170,14 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     @Inject
     public NotificationHookConsumer(NotificationInterface 
notificationInterface, AtlasEntityStore atlasEntityStore,
                                     ServiceState serviceState, 
AtlasInstanceConverter instanceConverter,
-                                    AtlasTypeRegistry typeRegistry, 
StatisticsUtil statisticsUtil) throws AtlasException {
+                                    AtlasTypeRegistry typeRegistry, 
AtlasMetricsUtil metricsUtil) throws AtlasException {
         this.notificationInterface = notificationInterface;
         this.atlasEntityStore      = atlasEntityStore;
         this.serviceState          = serviceState;
         this.instanceConverter     = instanceConverter;
         this.typeRegistry          = typeRegistry;
         this.applicationProperties = ApplicationProperties.get();
-        this.statisticsUtil        = statisticsUtil;
+        this.metricsUtil           = metricsUtil;
 
         maxRetries            = 
applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
         failedMsgCacheSize    = 
applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1);
@@ -475,12 +475,12 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
         @VisibleForTesting
         void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) 
throws AtlasServiceException, AtlasException {
-            AtlasPerfTracer  perf        = null;
-            HookNotification message     = kafkaMsg.getMessage();
-            String           messageUser = message.getUser();
-            long             startTime   = System.currentTimeMillis();
-            boolean          isFailedMsg = false;
-            AuditLog         auditLog = null;
+            AtlasPerfTracer  perf           = null;
+            HookNotification message        = kafkaMsg.getMessage();
+            String           messageUser    = message.getUser();
+            long             startTime      = System.currentTimeMillis();
+            NotificationStat stats          = new NotificationStat();
+            AuditLog         auditLog       = null;
 
             if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
                 perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, 
message.getType().name());
@@ -525,7 +525,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                                                             
AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath());
                                 }
 
-                                createOrUpdate(entities, false, context);
+                                createOrUpdate(entities, false, stats, 
context);
                             }
                             break;
 
@@ -546,7 +546,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                                 // There should only be one root entity
                                 entities.getEntities().get(0).setGuid(guid);
 
-                                createOrUpdate(entities, true, context);
+                                createOrUpdate(entities, true, stats, context);
                             }
                             break;
 
@@ -562,7 +562,9 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                                 try {
                                     AtlasEntityType type = (AtlasEntityType) 
typeRegistry.getType(deleteRequest.getTypeName());
 
-                                    
atlasEntityStore.deleteByUniqueAttributes(type, 
Collections.singletonMap(deleteRequest.getAttribute(), (Object) 
deleteRequest.getAttributeValue()));
+                                    EntityMutationResponse response = 
atlasEntityStore.deleteByUniqueAttributes(type, 
Collections.singletonMap(deleteRequest.getAttribute(), (Object) 
deleteRequest.getAttributeValue()));
+
+                                    stats.updateStats(response);
                                 } catch (ClassCastException cle) {
                                     LOG.error("Failed to delete entity {}", 
deleteRequest);
                                 }
@@ -579,7 +581,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                                                             
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
                                 }
 
-                                createOrUpdate(entities, false, context);
+                                createOrUpdate(entities, false, stats, 
context);
                             }
                             break;
 
@@ -593,7 +595,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                                                             
AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath());
                                 }
 
-                                createOrUpdate(entities, false, context);
+                                createOrUpdate(entities, false, stats, 
context);
                             }
                             break;
 
@@ -608,7 +610,9 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                                                             
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
                                 }
 
-                                atlasEntityStore.updateEntity(entityId, 
entity, true);
+                                EntityMutationResponse response = 
atlasEntityStore.updateEntity(entityId, entity, true);
+
+                                stats.updateStats(response);
                             }
                             break;
 
@@ -622,7 +626,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                                                             
AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath());
                                 }
 
-                                createOrUpdate(entities, false, context);
+                                createOrUpdate(entities, false, stats, 
context);
                             }
                             break;
 
@@ -640,7 +644,9 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
                                         AtlasEntityType type = 
(AtlasEntityType) typeRegistry.getType(entity.getTypeName());
 
-                                        
atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes());
+                                        EntityMutationResponse response = 
atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes());
+
+                                        stats.updateStats(response);
                                     }
                                 } catch (ClassCastException cle) {
                                     LOG.error("Failed to do delete entities 
{}", entities);
@@ -661,7 +667,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
                             LOG.warn("Max retries exceeded for message {}", 
strMessage, e);
 
-                            isFailedMsg = true;
+                            stats.isFailedMsg = true;
 
                             failedMessages.add(strMessage);
 
@@ -689,33 +695,34 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             } finally {
                 AtlasPerfTracer.log(perf);
 
-                long msgProcessingTime = System.currentTimeMillis() - 
startTime;
+                stats.timeTakenMs = System.currentTimeMillis() - startTime;
+
+                
metricsUtil.onNotificationProcessingComplete(kafkaMsg.getOffset(), stats);
 
-                if (msgProcessingTime > largeMessageProcessingTimeThresholdMs) 
{
+                if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs) 
{
                     String strMessage = 
AbstractNotification.getMessageJson(message);
 
-                    LOG.warn("msgProcessingTime={}, msgSize={}, 
topicOffset={}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset());
-                    
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}",
 msgProcessingTime, strMessage.length(), kafkaMsg.getOffset(), strMessage);
+                    LOG.warn("msgProcessingTime={}, msgSize={}, 
topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset());
+                    
LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}",
 stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage);
                 }
 
                 if (auditLog != null) {
-                    auditLog.setHttpStatus(isFailedMsg ? SC_BAD_REQUEST : 
SC_OK);
-                    auditLog.setTimeTaken(msgProcessingTime);
+                    auditLog.setHttpStatus(stats.isFailedMsg ? SC_BAD_REQUEST 
: SC_OK);
+                    auditLog.setTimeTaken(stats.timeTakenMs);
 
                     AuditFilter.audit(auditLog);
                 }
-                statisticsUtil.setAvgMsgProcessingTime(msgProcessingTime);
             }
         }
 
-        private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean 
isPartialUpdate, PreprocessorContext context) throws AtlasBaseException {
+        private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean 
isPartialUpdate, NotificationStat stats, PreprocessorContext context) throws 
AtlasBaseException {
             List<AtlasEntity> entitiesList = entities.getEntities();
             AtlasEntityStream entityStream = new AtlasEntityStream(entities);
 
             if (commitBatchSize <= 0 || entitiesList.size() <= 
commitBatchSize) {
                 EntityMutationResponse response = 
atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate);
 
-                recordProcessedEntities(response, context);
+                recordProcessedEntities(response, stats, context);
             } else {
                 for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx 
+= commitBatchSize) {
                     int toIndex = fromIdx + commitBatchSize;
@@ -733,7 +740,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
                     EntityMutationResponse response = 
atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate);
 
-                    recordProcessedEntities(response, context);
+                    recordProcessedEntities(response, stats, context);
 
                     RequestContext.get().resetEntityGuidUpdates();
 
@@ -770,8 +777,6 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
                 consumer.commit(partition, kafkaMessage.getOffset() + 1);
                 commitSucceessStatus = true;
-                statisticsUtil.setKafkaOffsets(kafkaMessage.getOffset());
-                statisticsUtil.setLastMsgProcessedTime();
             } finally {
                 
failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, 
kafkaMessage.getOffset());
             }
@@ -1021,24 +1026,30 @@ public class NotificationHookConsumer implements 
Service, ActiveStateChangeHandl
         return ret;
     }
 
-    private void recordProcessedEntities(EntityMutationResponse 
mutationResponse, PreprocessorContext context) {
-        if (mutationResponse != null && context != null) {
-            if (MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) {
-                
context.getGuidAssignments().putAll(mutationResponse.getGuidAssignments());
+    private void recordProcessedEntities(EntityMutationResponse 
mutationResponse, NotificationStat stats, PreprocessorContext context) {
+        if (mutationResponse != null) {
+            if (stats != null) {
+                stats.updateStats(mutationResponse);
             }
 
-            if 
(CollectionUtils.isNotEmpty(mutationResponse.getCreatedEntities())) {
-                for (AtlasEntityHeader entity : 
mutationResponse.getCreatedEntities()) {
-                    if (entity != null && entity.getGuid() != null) {
-                        context.getCreatedEntities().add(entity.getGuid());
+            if (context != null) {
+                if 
(MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) {
+                    
context.getGuidAssignments().putAll(mutationResponse.getGuidAssignments());
+                }
+
+                if 
(CollectionUtils.isNotEmpty(mutationResponse.getCreatedEntities())) {
+                    for (AtlasEntityHeader entity : 
mutationResponse.getCreatedEntities()) {
+                        if (entity != null && entity.getGuid() != null) {
+                            context.getCreatedEntities().add(entity.getGuid());
+                        }
                     }
                 }
-            }
 
-            if 
(CollectionUtils.isNotEmpty(mutationResponse.getDeletedEntities())) {
-                for (AtlasEntityHeader entity : 
mutationResponse.getDeletedEntities()) {
-                    if (entity != null && entity.getGuid() != null) {
-                        context.getDeletedEntities().add(entity.getGuid());
+                if 
(CollectionUtils.isNotEmpty(mutationResponse.getDeletedEntities())) {
+                    for (AtlasEntityHeader entity : 
mutationResponse.getDeletedEntities()) {
+                        if (entity != null && entity.getGuid() != null) {
+                            context.getDeletedEntities().add(entity.getGuid());
+                        }
                     }
                 }
             }
diff --git 
a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
 
b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
index 7887afb..10081ac 100644
--- 
a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
+++ 
b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
@@ -23,7 +23,7 @@ import org.apache.atlas.ha.AtlasServerIdSelector;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.service.Service;
-import org.apache.atlas.util.StatisticsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil;
 import org.apache.commons.configuration.Configuration;
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
@@ -55,18 +55,17 @@ import java.util.Set;
 @Component
 @Order(1)
 public class ActiveInstanceElectorService implements Service, 
LeaderLatchListener {
-
     private static final Logger LOG = 
LoggerFactory.getLogger(ActiveInstanceElectorService.class);
 
-    private final Configuration configuration;
-    private final ServiceState serviceState;
-    private final ActiveInstanceState activeInstanceState;
-    private final StatisticsUtil statisticsUtil;
-    private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders;
-    private List<ActiveStateChangeHandler> activeStateChangeHandlers;
-    private CuratorFactory curatorFactory;
-    private LeaderLatch leaderLatch;
-    private String serverId;
+    private final Configuration                  configuration;
+    private final ServiceState                   serviceState;
+    private final ActiveInstanceState            activeInstanceState;
+    private final AtlasMetricsUtil               metricsUtil;
+    private       Set<ActiveStateChangeHandler>  
activeStateChangeHandlerProviders;
+    private       List<ActiveStateChangeHandler> activeStateChangeHandlers;
+    private       CuratorFactory                 curatorFactory;
+    private       LeaderLatch                    leaderLatch;
+    private       String                         serverId;
 
     /**
      * Create a new instance of {@link ActiveInstanceElectorService}
@@ -78,14 +77,14 @@ public class ActiveInstanceElectorService implements 
Service, LeaderLatchListene
     ActiveInstanceElectorService(Configuration configuration,
                                  Set<ActiveStateChangeHandler> 
activeStateChangeHandlerProviders,
                                  CuratorFactory curatorFactory, 
ActiveInstanceState activeInstanceState,
-                                 ServiceState serviceState, StatisticsUtil 
statisticsUtil) {
-        this.configuration = configuration;
+                                 ServiceState serviceState, AtlasMetricsUtil 
metricsUtil) {
+        this.configuration                     = configuration;
         this.activeStateChangeHandlerProviders = 
activeStateChangeHandlerProviders;
-        this.activeStateChangeHandlers = new ArrayList<>();
-        this.curatorFactory = curatorFactory;
-        this.activeInstanceState = activeInstanceState;
-        this.serviceState = serviceState;
-        this.statisticsUtil = statisticsUtil;
+        this.activeStateChangeHandlers         = new ArrayList<>();
+        this.curatorFactory                    = curatorFactory;
+        this.activeInstanceState               = activeInstanceState;
+        this.serviceState                      = serviceState;
+        this.metricsUtil                       = metricsUtil;
     }
 
     /**
@@ -96,9 +95,9 @@ public class ActiveInstanceElectorService implements Service, 
LeaderLatchListene
      */
     @Override
     public void start() throws AtlasException {
-        statisticsUtil.setServerStartTime();
+        metricsUtil.onServerStart();
         if (!HAConfiguration.isHAEnabled(configuration)) {
-            statisticsUtil.setServerActiveTime();
+            metricsUtil.onServerActivation();
             LOG.info("HA is not enabled, no need to start leader election 
service");
             return;
         }
@@ -156,7 +155,7 @@ public class ActiveInstanceElectorService implements 
Service, LeaderLatchListene
             }
             activeInstanceState.update(serverId);
             serviceState.setActive();
-            statisticsUtil.setServerActiveTime();
+            metricsUtil.onServerActivation();
         } catch (Exception e) {
             LOG.error("Got exception while activating", e);
             notLeader();
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index c7ba699..fb3ff26 100644
--- 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -25,7 +25,7 @@ import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.kafka.*;
 import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.notification.HookNotification;
-import org.apache.atlas.util.StatisticsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import org.apache.atlas.v1.model.notification.HookNotificationV1;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
@@ -82,7 +82,7 @@ public class NotificationHookConsumerKafkaTest {
     private AtlasTypeRegistry typeRegistry;
 
     @Mock
-    private StatisticsUtil statisticsUtil;
+    private AtlasMetricsUtil metricsUtil;
 
     @BeforeTest
     public void setup() throws AtlasException, InterruptedException, 
AtlasBaseException {
@@ -108,7 +108,7 @@ public class NotificationHookConsumerKafkaTest {
         produceMessage(new 
HookNotificationV1.EntityCreateRequest("test_user1", createEntity()));
 
         NotificationConsumer<HookNotification> consumer                 = 
createNewConsumer(kafkaNotification, false);
-        NotificationHookConsumer               notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, statisticsUtil);
+        NotificationHookConsumer               notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, metricsUtil);
         NotificationHookConsumer.HookConsumer  hookConsumer             = 
notificationHookConsumer.new HookConsumer(consumer);
 
         consumeOneMessage(consumer, hookConsumer);
@@ -127,7 +127,7 @@ public class NotificationHookConsumerKafkaTest {
     public void 
consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() 
throws AtlasException, InterruptedException, AtlasBaseException {
 
         ExceptionThrowingCommitConsumer        consumer                 = 
createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true);
-        NotificationHookConsumer               notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, statisticsUtil);
+        NotificationHookConsumer               notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, metricsUtil);
         NotificationHookConsumer.HookConsumer  hookConsumer             = 
notificationHookConsumer.new HookConsumer(consumer);
         NotificationHookConsumer.FailedCommitOffsetRecorder 
failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder;
 
@@ -163,7 +163,7 @@ public class NotificationHookConsumerKafkaTest {
 
         assertNotNull (consumer);
 
-        NotificationHookConsumer              notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, statisticsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, metricsUtil);
         NotificationHookConsumer.HookConsumer hookConsumer             = 
notificationHookConsumer.new HookConsumer(consumer);
 
         consumeOneMessage(consumer, hookConsumer);
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index de316b6..3e35511 100644
--- 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -26,7 +26,7 @@ import 
org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
 import org.apache.atlas.model.instance.EntityMutationResponse;
 import 
org.apache.atlas.model.notification.HookNotification.HookNotificationType;
 import org.apache.atlas.notification.NotificationInterface.NotificationType;
-import org.apache.atlas.util.StatisticsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil;
 import org.apache.atlas.v1.model.instance.Referenceable;
 import 
org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
@@ -77,7 +77,7 @@ public class NotificationHookConsumerTest {
     private AtlasTypeRegistry typeRegistry;
 
     @Mock
-    private StatisticsUtil statisticsUtil;
+    private AtlasMetricsUtil metricsUtil;
 
     @BeforeMethod
     public void setup() throws AtlasBaseException {
@@ -96,7 +96,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerCanProceedIfServerIsReady() throws Exception {
-        NotificationHookConsumer              notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, statisticsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, metricsUtil);
         NotificationHookConsumer.HookConsumer hookConsumer             = 
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
         NotificationHookConsumer.Timer        timer                    = 
mock(NotificationHookConsumer.Timer.class);
 
@@ -109,7 +109,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws 
Exception {
-        NotificationHookConsumer              notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, statisticsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, metricsUtil);
         NotificationHookConsumer.HookConsumer hookConsumer             = 
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
         NotificationHookConsumer.Timer        timer                    = 
mock(NotificationHookConsumer.Timer.class);
 
@@ -126,7 +126,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testCommitIsCalledWhenMessageIsProcessed() throws 
AtlasServiceException, AtlasException {
-        NotificationHookConsumer               notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, statisticsUtil);
+        NotificationHookConsumer               notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, metricsUtil);
         NotificationConsumer                   consumer                 = 
mock(NotificationConsumer.class);
         NotificationHookConsumer.HookConsumer  hookConsumer             = 
notificationHookConsumer.new HookConsumer(consumer);
         EntityCreateRequest                    message                  = 
mock(EntityCreateRequest.class);
@@ -143,7 +143,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws 
AtlasServiceException, AtlasException, AtlasBaseException {
-        NotificationHookConsumer              notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, statisticsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, metricsUtil);
         NotificationConsumer                  consumer                 = 
mock(NotificationConsumer.class);
         NotificationHookConsumer.HookConsumer hookConsumer             = 
notificationHookConsumer.new HookConsumer(consumer);
         EntityCreateRequest                   message                  = new 
EntityCreateRequest("user", 
Collections.singletonList(mock(Referenceable.class)));
@@ -157,7 +157,7 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumerProceedsWithFalseIfInterrupted() throws Exception {
-        NotificationHookConsumer              notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, statisticsUtil);
+        NotificationHookConsumer              notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, metricsUtil);
         NotificationHookConsumer.HookConsumer hookConsumer             = 
notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class));
         NotificationHookConsumer.Timer        timer                    = 
mock(NotificationHookConsumer.Timer.class);
 
@@ -177,9 +177,7 @@ public class NotificationHookConsumerTest {
         
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, 
false)).thenReturn(false);
         
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 
1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 
1)).thenReturn(consumers);
-
-        NotificationHookConsumer notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, statisticsUtil);
-
+        NotificationHookConsumer notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, metricsUtil);
         notificationHookConsumer.startInternal(configuration, executorService);
 
         verify(notificationInterface).createConsumers(NotificationType.HOOK, 
1);
@@ -197,8 +195,7 @@ public class NotificationHookConsumerTest {
         
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true);
         
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 
1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 
1)).thenReturn(consumers);
-
-        NotificationHookConsumer notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, statisticsUtil);
+        NotificationHookConsumer notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, metricsUtil);
 
         notificationHookConsumer.startInternal(configuration, executorService);
 
@@ -217,7 +214,7 @@ public class NotificationHookConsumerTest {
         
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 
1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 
1)).thenReturn(consumers);
 
-        NotificationHookConsumer notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, statisticsUtil);
+        NotificationHookConsumer notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, metricsUtil);
 
         notificationHookConsumer.startInternal(configuration, executorService);
         notificationHookConsumer.instanceIsActive();
@@ -237,8 +234,7 @@ public class NotificationHookConsumerTest {
         
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, 
false)).thenReturn(true);
         
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 
1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 
1)).thenReturn(consumers);
-
-        final NotificationHookConsumer notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, statisticsUtil);
+        final NotificationHookConsumer notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, metricsUtil);
 
         doAnswer(new Answer() {
             @Override
@@ -269,8 +265,7 @@ public class NotificationHookConsumerTest {
         
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, 
false)).thenReturn(true);
         
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 
1)).thenReturn(1);
         when(notificationInterface.createConsumers(NotificationType.HOOK, 
1)).thenReturn(consumers);
-
-        final NotificationHookConsumer notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, statisticsUtil);
+        final NotificationHookConsumer notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry, metricsUtil);
 
         notificationHookConsumer.startInternal(configuration, executorService);
         notificationHookConsumer.instanceIsPassive();
@@ -335,7 +330,6 @@ public class NotificationHookConsumerTest {
         
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 
1)).thenReturn(1);
         when(notificationConsumerMock.receive()).thenThrow(new 
IllegalStateException());
         when(notificationInterface.createConsumers(NotificationType.HOOK, 
1)).thenReturn(consumers);
-
-        return new NotificationHookConsumer(notificationInterface, 
atlasEntityStore, serviceState, instanceConverter, typeRegistry, 
statisticsUtil);
+        return new NotificationHookConsumer(notificationInterface, 
atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil);
     }
 }
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java
 
b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java
index 0fe3eba..3ce0c4b 100644
--- 
a/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java
+++ 
b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java
@@ -23,7 +23,7 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
-import org.apache.atlas.util.StatisticsUtil;
+import org.apache.atlas.util.AtlasMetricsUtil;
 import org.apache.commons.configuration.Configuration;
 import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.mockito.InOrder;
@@ -53,7 +53,7 @@ public class ActiveInstanceElectorServiceTest {
     private ServiceState serviceState;
 
     @Mock
-    private StatisticsUtil statisticsUtil;
+    private AtlasMetricsUtil metricsUtil;
 
     @BeforeMethod
     public void setup() {
@@ -75,7 +75,7 @@ public class ActiveInstanceElectorServiceTest {
 
         ActiveInstanceElectorService activeInstanceElectorService =
                 new ActiveInstanceElectorService(configuration, new 
HashSet<ActiveStateChangeHandler>(), curatorFactory,
-                        activeInstanceState, serviceState, statisticsUtil);
+                        activeInstanceState, serviceState, metricsUtil);
         activeInstanceElectorService.start();
 
         verify(leaderLatch).start();
@@ -96,7 +96,7 @@ public class ActiveInstanceElectorServiceTest {
 
         ActiveInstanceElectorService activeInstanceElectorService =
                 new ActiveInstanceElectorService(configuration, new 
HashSet<ActiveStateChangeHandler>(), curatorFactory,
-                        activeInstanceState, serviceState, statisticsUtil);
+                        activeInstanceState, serviceState, metricsUtil);
         activeInstanceElectorService.start();
 
         verify(leaderLatch).addListener(activeInstanceElectorService);
@@ -108,7 +108,7 @@ public class ActiveInstanceElectorServiceTest {
 
         ActiveInstanceElectorService activeInstanceElectorService =
                 new ActiveInstanceElectorService(configuration, new 
HashSet<ActiveStateChangeHandler>(), curatorFactory,
-                        activeInstanceState, serviceState, statisticsUtil);
+                        activeInstanceState, serviceState, metricsUtil);
         activeInstanceElectorService.start();
 
         verifyZeroInteractions(curatorFactory);
@@ -129,7 +129,7 @@ public class ActiveInstanceElectorServiceTest {
 
         ActiveInstanceElectorService activeInstanceElectorService =
                 new ActiveInstanceElectorService(configuration, new 
HashSet<ActiveStateChangeHandler>(), curatorFactory,
-                        activeInstanceState, serviceState, statisticsUtil);
+                        activeInstanceState, serviceState, metricsUtil);
         activeInstanceElectorService.start();
         activeInstanceElectorService.stop();
 
@@ -151,7 +151,7 @@ public class ActiveInstanceElectorServiceTest {
 
         ActiveInstanceElectorService activeInstanceElectorService =
                 new ActiveInstanceElectorService(configuration, new 
HashSet<ActiveStateChangeHandler>(), curatorFactory,
-                        activeInstanceState, serviceState, statisticsUtil);
+                        activeInstanceState, serviceState, metricsUtil);
         activeInstanceElectorService.start();
         activeInstanceElectorService.stop();
 
@@ -165,7 +165,7 @@ public class ActiveInstanceElectorServiceTest {
 
         ActiveInstanceElectorService activeInstanceElectorService =
                 new ActiveInstanceElectorService(configuration, new 
HashSet<ActiveStateChangeHandler>(), curatorFactory,
-                        activeInstanceState, serviceState, statisticsUtil);
+                        activeInstanceState, serviceState, metricsUtil);
         activeInstanceElectorService.stop();
 
         verifyZeroInteractions(curatorFactory);
@@ -193,7 +193,7 @@ public class ActiveInstanceElectorServiceTest {
 
         ActiveInstanceElectorService activeInstanceElectorService =
                 new ActiveInstanceElectorService(configuration, 
changeHandlers, curatorFactory,
-                        activeInstanceState, serviceState, statisticsUtil);
+                        activeInstanceState, serviceState, metricsUtil);
         activeInstanceElectorService.start();
         activeInstanceElectorService.isLeader();
 
@@ -216,7 +216,7 @@ public class ActiveInstanceElectorServiceTest {
 
         ActiveInstanceElectorService activeInstanceElectorService =
                 new ActiveInstanceElectorService(configuration, new 
HashSet<ActiveStateChangeHandler>(), curatorFactory,
-                        activeInstanceState, serviceState, statisticsUtil);
+                        activeInstanceState, serviceState, metricsUtil);
 
         activeInstanceElectorService.start();
         activeInstanceElectorService.isLeader();
@@ -249,7 +249,7 @@ public class ActiveInstanceElectorServiceTest {
 
         ActiveInstanceElectorService activeInstanceElectorService =
                 new ActiveInstanceElectorService(configuration, 
changeHandlers, curatorFactory,
-                        activeInstanceState, serviceState, statisticsUtil);
+                        activeInstanceState, serviceState, metricsUtil);
         activeInstanceElectorService.start();
         activeInstanceElectorService.isLeader();
 
@@ -275,7 +275,7 @@ public class ActiveInstanceElectorServiceTest {
 
         ActiveInstanceElectorService activeInstanceElectorService =
                 new ActiveInstanceElectorService(configuration, new 
HashSet<ActiveStateChangeHandler>(), curatorFactory,
-                        activeInstanceState, serviceState, statisticsUtil);
+                        activeInstanceState, serviceState, metricsUtil);
 
         activeInstanceElectorService.start();
         activeInstanceElectorService.isLeader();
@@ -310,7 +310,7 @@ public class ActiveInstanceElectorServiceTest {
 
         ActiveInstanceElectorService activeInstanceElectorService =
                 new ActiveInstanceElectorService(configuration, 
changeHandlers, curatorFactory,
-                        activeInstanceState, serviceState, statisticsUtil);
+                        activeInstanceState, serviceState, metricsUtil);
         activeInstanceElectorService.start();
         activeInstanceElectorService.notLeader();
 
@@ -322,7 +322,7 @@ public class ActiveInstanceElectorServiceTest {
     public void testActiveStateSetOnBecomingLeader() {
         ActiveInstanceElectorService activeInstanceElectorService =
                 new ActiveInstanceElectorService(configuration, new 
HashSet<ActiveStateChangeHandler>(),
-                        curatorFactory, activeInstanceState, serviceState, 
statisticsUtil);
+                        curatorFactory, activeInstanceState, serviceState, 
metricsUtil);
 
         activeInstanceElectorService.isLeader();
 
@@ -335,7 +335,7 @@ public class ActiveInstanceElectorServiceTest {
     public void testPassiveStateSetOnLoosingLeadership() {
         ActiveInstanceElectorService activeInstanceElectorService =
                 new ActiveInstanceElectorService(configuration, new 
HashSet<ActiveStateChangeHandler>(),
-                        curatorFactory, activeInstanceState, serviceState, 
statisticsUtil);
+                        curatorFactory, activeInstanceState, serviceState, 
metricsUtil);
 
         activeInstanceElectorService.notLeader();
 
@@ -362,7 +362,7 @@ public class ActiveInstanceElectorServiceTest {
 
         ActiveInstanceElectorService activeInstanceElectorService =
                 new ActiveInstanceElectorService(configuration, new 
HashSet<ActiveStateChangeHandler>(),
-                        curatorFactory, activeInstanceState, serviceState, 
statisticsUtil);
+                        curatorFactory, activeInstanceState, serviceState, 
metricsUtil);
         activeInstanceElectorService.start();
         activeInstanceElectorService.isLeader();
 

Reply via email to