atlas git commit: ATLAS-2634: Avoid duplicate message processing.

2018-09-18 Thread madhan
Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 37f59dc95 -> 2f7348988


ATLAS-2634: Avoid duplicate message processing.

Signed-off-by: Ashutosh Mestry 
(cherry picked from commit f29a2b7bb2b555e68d7f5e2b43221f85877aa39c)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/2f734898
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/2f734898
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/2f734898

Branch: refs/heads/branch-0.8
Commit: 2f7348988b992e8a9e5a71cf1a483803fa7d6db8
Parents: 37f59dc
Author: Ashutosh Mestry 
Authored: Thu May 3 16:22:10 2018 -0700
Committer: Madhan Neethiraj 
Committed: Tue Sep 18 10:58:21 2018 -0700

--
 .../apache/atlas/kafka/KafkaNotification.java   |  3 +-
 .../notification/NotificationHookConsumer.java  | 42 +++-
 .../NotificationHookConsumerKafkaTest.java  | 67 
 3 files changed, 108 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/atlas/blob/2f734898/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
--
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 4c63027..4c753d2 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -245,8 +245,9 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
 }
 
 
+@VisibleForTesting
 // Get properties for consumer request
-private Properties getConsumerProperties(NotificationType type) {
+public Properties getConsumerProperties(NotificationType type) {
 // find the configured group id for the given notification type
 String groupId = properties.getProperty(type.toString().toLowerCase() 
+ "." + CONSUMER_GROUP_ID_PROPERTY);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/2f734898/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
--
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 88a8cce..1a567af 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -286,10 +286,13 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 private final List  
failedMessages = new ArrayList<>();
 private final AdaptiveWaiter
adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, 
minWaitDuration);
 
+@VisibleForTesting
+final FailedCommitOffsetRecorder failedCommitOffsetRecorder;
 
 public HookConsumer(NotificationConsumer 
consumer) {
 super("atlas-hook-consumer-thread", false);
 this.consumer = consumer;
+failedCommitOffsetRecorder = new FailedCommitOffsetRecorder();
 }
 
 @Override
@@ -342,6 +345,11 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 }
 
 try {
+
if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) {
+commit(kafkaMsg);
+return;
+}
+
 // Used for intermediate conversions during create and update
 for (int numRetries = 0; numRetries < maxRetries; 
numRetries++) {
 if (LOG.isDebugEnabled()) {
@@ -538,9 +546,17 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 }
 
 private void commit(AtlasKafkaMessage 
kafkaMessage) {
-recordFailedMessages();
-TopicPartition partition = new TopicPartition("ATLAS_HOOK", 
kafkaMessage.getPartition());
-consumer.commit(partition, kafkaMessage.getOffset() + 1);
+boolean commitSucceessStatus = false;
+try {
+recordFailedMessages();
+
+TopicPartition partition = new TopicPartition("ATLAS_HOOK", 
kafkaMessage.getPartition());
+
+consumer.commit(partition, kafkaMessage.getOffset() + 1);
+commitSucceessStatus = true;
+} finally {
+
failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, 
kafkaMessage.getOffset());
+}
 }
 
 boolean serverAvailable(Timer timer) {

atlas git commit: ATLAS-2634: Avoid duplicate message processing.

2018-05-04 Thread amestry
Repository: atlas
Updated Branches:
  refs/heads/master 015b8bf38 -> f29a2b7bb


ATLAS-2634: Avoid duplicate message processing.

Signed-off-by: Ashutosh Mestry 


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/f29a2b7b
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/f29a2b7b
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/f29a2b7b

Branch: refs/heads/master
Commit: f29a2b7bb2b555e68d7f5e2b43221f85877aa39c
Parents: 015b8bf
Author: Ashutosh Mestry 
Authored: Thu May 3 16:22:10 2018 -0700
Committer: Ashutosh Mestry 
Committed: Fri May 4 15:54:17 2018 -0700

--
 .../apache/atlas/kafka/KafkaNotification.java   |  6 +-
 .../notification/NotificationHookConsumer.java  | 42 -
 .../NotificationHookConsumerKafkaTest.java  | 66 
 3 files changed, 109 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/atlas/blob/f29a2b7b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
--
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 80dc514..00e56e3 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -241,8 +241,10 @@ public class KafkaNotification extends 
AbstractNotification implements Service {
 }
 
 
-// Get properties for consumer request
-private Properties getConsumerProperties(NotificationType type) {
+@VisibleForTesting
+public
+// Get properties for consumer request
+Properties getConsumerProperties(NotificationType type) {
 // find the configured group id for the given notification type
 String groupId = properties.getProperty(type.toString().toLowerCase() 
+ "." + CONSUMER_GROUP_ID_PROPERTY);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/f29a2b7b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
--
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 7a4596a..f5e555d 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -55,6 +55,7 @@ import org.apache.atlas.web.filters.AuditFilter;
 import org.apache.atlas.web.filters.AuditFilter.AuditLog;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.commons.configuration.Configuration;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -297,10 +298,14 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 private final List failedMessages = 
new ArrayList<>();
 private final AdaptiveWaiter adaptiveWaiter = 
new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration);
 
+@VisibleForTesting
+final FailedCommitOffsetRecorder failedCommitOffsetRecorder;
+
 public HookConsumer(NotificationConsumer consumer) {
 super("atlas-hook-consumer-thread", false);
 
 this.consumer = consumer;
+failedCommitOffsetRecorder = new FailedCommitOffsetRecorder();
 }
 
 @Override
@@ -358,6 +363,11 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 }
 
 try {
+
if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) {
+commit(kafkaMsg);
+return;
+}
+
 // Used for intermediate conversions during create and update
 for (int numRetries = 0; numRetries < maxRetries; 
numRetries++) {
 if (LOG.isDebugEnabled()) {
@@ -558,11 +568,17 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 }
 
 private void commit(AtlasKafkaMessage kafkaMessage) {
-recordFailedMessages();
+boolean commitSucceessStatus = false;
+try {
+recordFailedMessages();
 
-TopicPartition partition = new TopicPartition("ATLAS_HOOK", 
kafkaMessage.getPartition());
+TopicPartition partition = new TopicPartition("ATLAS_HOOK",