ATLAS-1908: updated to use existing Kafka consumer properties when equivalent new Kafka consumer properties are not present
Signed-off-by: Madhan Neethiraj <mad...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/0d8f9f8d Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/0d8f9f8d Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/0d8f9f8d Branch: refs/heads/feature-odf Commit: 0d8f9f8d2d3902471dc246bd51d3eb7512021f74 Parents: de91608 Author: nixonrodrigues <ni...@apache.org> Authored: Mon Jul 3 18:18:47 2017 +0530 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Fri Jul 14 15:28:58 2017 -0700 ---------------------------------------------------------------------- distro/src/conf/atlas-application.properties | 2 +- .../org/apache/atlas/kafka/AtlasKafkaConsumer.java | 13 +++++++------ .../org/apache/atlas/kafka/KafkaNotification.java | 12 ++++++++++-- .../atlas/notification/NotificationConsumer.java | 3 +-- .../org/apache/atlas/kafka/KafkaConsumerTest.java | 16 ++++++++-------- .../apache/atlas/kafka/KafkaNotificationTest.java | 2 +- .../AbstractNotificationConsumerTest.java | 8 ++++---- .../notification/NotificationHookConsumer.java | 2 +- .../NotificationHookConsumerKafkaTest.java | 2 +- .../atlas/web/integration/BaseResourceIT.java | 2 +- 10 files changed, 35 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/0d8f9f8d/distro/src/conf/atlas-application.properties ---------------------------------------------------------------------- diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties index c3213df..29a4cc1 100755 --- a/distro/src/conf/atlas-application.properties +++ b/distro/src/conf/atlas-application.properties @@ -79,7 +79,7 @@ atlas.kafka.hook.group.id=atlas atlas.kafka.enable.auto.commit=false atlas.kafka.auto.offset.reset=earliest atlas.kafka.session.timeout.ms=30000 - +atlas.kafka.poll.timeout.ms=1000 atlas.notification.create.topics=true atlas.notification.replicas=1 http://git-wip-us.apache.org/repos/asf/atlas/blob/0d8f9f8d/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java index 9c15243..52d0916 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java @@ -41,19 +41,20 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { private static final Logger LOG = LoggerFactory.getLogger(AtlasKafkaConsumer.class); private final KafkaConsumer kafkaConsumer; - private final boolean autoCommitEnabled; + private final boolean autoCommitEnabled; + private long pollTimeoutMilliSeconds = 1000L; - public AtlasKafkaConsumer(MessageDeserializer<T> deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled) { + public AtlasKafkaConsumer(MessageDeserializer<T> deserializer, KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long pollTimeoutMilliSeconds) { super(deserializer); - - this.kafkaConsumer = kafkaConsumer; + this.kafkaConsumer = kafkaConsumer; this.autoCommitEnabled = autoCommitEnabled; + this.pollTimeoutMilliSeconds = pollTimeoutMilliSeconds; } - public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) { + public List<AtlasKafkaMessage<T>> receive() { List<AtlasKafkaMessage<T>> messages = new ArrayList(); - ConsumerRecords<?, ?> records = kafkaConsumer.poll(timeoutMilliSeconds); + ConsumerRecords<?, ?> records = kafkaConsumer.poll(pollTimeoutMilliSeconds); if (records != null) { for (ConsumerRecord<?, ?> record : records) { http://git-wip-us.apache.org/repos/asf/atlas/blob/0d8f9f8d/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 366c8a7..38889ef 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -83,6 +83,7 @@ public class KafkaNotification extends AbstractNotification implements Service { private Properties properties; private KafkaConsumer consumer = null; private KafkaProducer producer = null; + private Long pollTimeOutMs = 1000L; private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() { { @@ -124,6 +125,13 @@ public class KafkaNotification extends AbstractNotification implements Service { properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + pollTimeOutMs = subsetConfiguration.getLong("poll.timeout.ms", 1000); + boolean oldApiCommitEnbleFlag = subsetConfiguration.getBoolean("auto.commit.enable",false); + //set old autocommit value if new autoCommit property is not set. + properties.put("enable.auto.commit", subsetConfiguration.getBoolean("enable.auto.commit", oldApiCommitEnbleFlag)); + properties.put("session.timeout.ms", subsetConfiguration.getString("session.timeout.ms", "30000")); + } @VisibleForTesting @@ -167,7 +175,7 @@ public class KafkaNotification extends AbstractNotification implements Service { public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) { return createConsumers(notificationType, numConsumers, - Boolean.valueOf(properties.getProperty("enable.auto.commit", "true"))); + Boolean.valueOf(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable","false")))); } @VisibleForTesting @@ -177,7 +185,7 @@ public class KafkaNotification extends AbstractNotification implements Service { Properties consumerProperties = getConsumerProperties(notificationType); List<NotificationConsumer<T>> consumers = new ArrayList<>(); - AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType.getDeserializer(), getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), autoCommitEnabled); + AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType.getDeserializer(), getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs ); consumers.add(kafkaConsumer); return consumers; } http://git-wip-us.apache.org/repos/asf/atlas/blob/0d8f9f8d/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java index 22e40f9..6d1c08a 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java @@ -41,8 +41,7 @@ public interface NotificationConsumer<T> { /** * Fetch data for the topics from Kafka - * @param timeoutMilliSeconds poll timeout * @return List containing kafka message and partionId and offset. */ - List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds); + List<AtlasKafkaMessage<T>> receive(); } http://git-wip-us.apache.org/repos/asf/atlas/blob/0d8f9f8d/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java index 70059cb..9b712f4 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java @@ -95,12 +95,12 @@ public class KafkaConsumerTest { ConsumerRecords records = new ConsumerRecords(mp); - when(kafkaConsumer.poll(1000)).thenReturn(records); + when(kafkaConsumer.poll(100)).thenReturn(records); when(messageAndMetadata.message()).thenReturn(json); - AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer,false); - List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(1000); + AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L); + List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(); assertTrue(messageList.size() > 0); HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage(); @@ -131,12 +131,12 @@ public class KafkaConsumerTest { mp.put(tp,klist); ConsumerRecords records = new ConsumerRecords(mp); - when(kafkaConsumer.poll(1000)).thenReturn(records); + when(kafkaConsumer.poll(100L)).thenReturn(records); when(messageAndMetadata.message()).thenReturn(json); - AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false); + AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer ,false, 100L); try { - List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(1000); + List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> messageList = consumer.receive(); assertTrue(messageList.size() > 0); HookNotification.HookNotificationMessage consumedMessage = messageList.get(0).getMessage(); @@ -154,7 +154,7 @@ public class KafkaConsumerTest { TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false); + AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, false, 100L); consumer.commit(tp, 1); @@ -166,7 +166,7 @@ public class KafkaConsumerTest { TopicPartition tp = new TopicPartition("ATLAS_HOOK",0); - AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true); + AtlasKafkaConsumer consumer =new AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(), kafkaConsumer, true , 100L); consumer.commit(tp, 1); http://git-wip-us.apache.org/repos/asf/atlas/blob/0d8f9f8d/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java index c791d43..a1e13b9 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java @@ -70,7 +70,7 @@ public class KafkaNotificationTest { List<AtlasKafkaMessage<Object>> messages = null ; long startTime = System.currentTimeMillis(); //fetch starting time while ((System.currentTimeMillis() - startTime) < 10000) { - messages = consumer.receive(1000L); + messages = consumer.receive(); if (messages.size() > 0) { break; } http://git-wip-us.apache.org/repos/asf/atlas/blob/0d8f9f8d/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java index 8324b57..68fe3d7 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java @@ -67,7 +67,7 @@ public class AbstractNotificationConsumerTest { NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); - List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L); + List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(); assertFalse(messageList.isEmpty()); @@ -106,7 +106,7 @@ public class AbstractNotificationConsumerTest { NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); - List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L); + List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(); assertEquals(new TestMessage("sValue1", 99), messageList.get(0).getMessage()); @@ -138,7 +138,7 @@ public class AbstractNotificationConsumerTest { NotificationConsumer<TestMessage> consumer = new TestNotificationConsumer<>(versionedMessageType, jsonList, logger); try { - List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(1000L); + List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive(); messageList.get(1).getMessage(); @@ -203,7 +203,7 @@ public class AbstractNotificationConsumerTest { } @Override - public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) { + public List<AtlasKafkaMessage<T>> receive() { List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList(); for(Object json : messageList) { tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1)); http://git-wip-us.apache.org/repos/asf/atlas/blob/0d8f9f8d/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 0dea0e2..51276d3 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -224,7 +224,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl while (shouldRun.get()) { try { - List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L); + List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(); for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) { handleMessage(msg); } http://git-wip-us.apache.org/repos/asf/atlas/blob/0d8f9f8d/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index 650ca0a..eb37fa8 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -155,7 +155,7 @@ public class NotificationHookConsumerKafkaTest { try { long startTime = System.currentTimeMillis(); //fetch starting time while ((System.currentTimeMillis() - startTime) < 10000) { - List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(1000L); + List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(); for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) { hookConsumer.handleMessage(msg); http://git-wip-us.apache.org/repos/asf/atlas/blob/0d8f9f8d/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java index c036cfa..496185f 100755 --- a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java +++ b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java @@ -637,7 +637,7 @@ public abstract class BaseResourceIT { try { while (System.currentTimeMillis() < maxCurrentTime) { - List<AtlasKafkaMessage<EntityNotification>> messageList = consumer.receive(1000); + List<AtlasKafkaMessage<EntityNotification>> messageList = consumer.receive(); if(messageList.size() > 0) { EntityNotification notification = messageList.get(0).getMessage(); if (predicate.evaluate(notification)) {