atlas git commit: ATLAS-2634: Avoid duplicate message processing.
Repository: atlas Updated Branches: refs/heads/branch-0.8 37f59dc95 -> 2f7348988 ATLAS-2634: Avoid duplicate message processing. Signed-off-by: Ashutosh Mestry (cherry picked from commit f29a2b7bb2b555e68d7f5e2b43221f85877aa39c) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/2f734898 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/2f734898 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/2f734898 Branch: refs/heads/branch-0.8 Commit: 2f7348988b992e8a9e5a71cf1a483803fa7d6db8 Parents: 37f59dc Author: Ashutosh Mestry Authored: Thu May 3 16:22:10 2018 -0700 Committer: Madhan Neethiraj Committed: Tue Sep 18 10:58:21 2018 -0700 -- .../apache/atlas/kafka/KafkaNotification.java | 3 +- .../notification/NotificationHookConsumer.java | 42 +++- .../NotificationHookConsumerKafkaTest.java | 67 3 files changed, 108 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/atlas/blob/2f734898/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java -- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 4c63027..4c753d2 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -245,8 +245,9 @@ public class KafkaNotification extends AbstractNotification implements Service { } +@VisibleForTesting // Get properties for consumer request -private Properties getConsumerProperties(NotificationType type) { +public Properties getConsumerProperties(NotificationType type) { // find the configured group id for the given notification type String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY); http://git-wip-us.apache.org/repos/asf/atlas/blob/2f734898/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java -- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 88a8cce..1a567af 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -286,10 +286,13 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final List failedMessages = new ArrayList<>(); private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); +@VisibleForTesting +final FailedCommitOffsetRecorder failedCommitOffsetRecorder; public HookConsumer(NotificationConsumer consumer) { super("atlas-hook-consumer-thread", false); this.consumer = consumer; +failedCommitOffsetRecorder = new FailedCommitOffsetRecorder(); } @Override @@ -342,6 +345,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } try { + if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) { +commit(kafkaMsg); +return; +} + // Used for intermediate conversions during create and update for (int numRetries = 0; numRetries < maxRetries; numRetries++) { if (LOG.isDebugEnabled()) { @@ -538,9 +546,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } private void commit(AtlasKafkaMessage kafkaMessage) { -recordFailedMessages(); -TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition()); -consumer.commit(partition, kafkaMessage.getOffset() + 1); +boolean commitSucceessStatus = false; +try { +recordFailedMessages(); + +TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition()); + +consumer.commit(partition, kafkaMessage.getOffset() + 1); +commitSucceessStatus = true; +} finally { + failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset()); +} } boolean serverAvailable(Timer timer) {
atlas git commit: ATLAS-2634: Avoid duplicate message processing.
Repository: atlas Updated Branches: refs/heads/master 015b8bf38 -> f29a2b7bb ATLAS-2634: Avoid duplicate message processing. Signed-off-by: Ashutosh MestryProject: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/f29a2b7b Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/f29a2b7b Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/f29a2b7b Branch: refs/heads/master Commit: f29a2b7bb2b555e68d7f5e2b43221f85877aa39c Parents: 015b8bf Author: Ashutosh Mestry Authored: Thu May 3 16:22:10 2018 -0700 Committer: Ashutosh Mestry Committed: Fri May 4 15:54:17 2018 -0700 -- .../apache/atlas/kafka/KafkaNotification.java | 6 +- .../notification/NotificationHookConsumer.java | 42 - .../NotificationHookConsumerKafkaTest.java | 66 3 files changed, 109 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/atlas/blob/f29a2b7b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java -- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 80dc514..00e56e3 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -241,8 +241,10 @@ public class KafkaNotification extends AbstractNotification implements Service { } -// Get properties for consumer request -private Properties getConsumerProperties(NotificationType type) { +@VisibleForTesting +public +// Get properties for consumer request +Properties getConsumerProperties(NotificationType type) { // find the configured group id for the given notification type String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY); http://git-wip-us.apache.org/repos/asf/atlas/blob/f29a2b7b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java -- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 7a4596a..f5e555d 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -55,6 +55,7 @@ import org.apache.atlas.web.filters.AuditFilter; import org.apache.atlas.web.filters.AuditFilter.AuditLog; import org.apache.atlas.web.service.ServiceState; import org.apache.commons.configuration.Configuration; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -297,10 +298,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final List failedMessages = new ArrayList<>(); private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); +@VisibleForTesting +final FailedCommitOffsetRecorder failedCommitOffsetRecorder; + public HookConsumer(NotificationConsumer consumer) { super("atlas-hook-consumer-thread", false); this.consumer = consumer; +failedCommitOffsetRecorder = new FailedCommitOffsetRecorder(); } @Override @@ -358,6 +363,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } try { + if(failedCommitOffsetRecorder.isMessageReplayed(kafkaMsg.getOffset())) { +commit(kafkaMsg); +return; +} + // Used for intermediate conversions during create and update for (int numRetries = 0; numRetries < maxRetries; numRetries++) { if (LOG.isDebugEnabled()) { @@ -558,11 +568,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } private void commit(AtlasKafkaMessage kafkaMessage) { -recordFailedMessages(); +boolean commitSucceessStatus = false; +try { +recordFailedMessages(); -TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition()); +TopicPartition partition = new TopicPartition("ATLAS_HOOK",