Repository: incubator-atlas
Updated Branches:
  refs/heads/master de9160844 -> 0d8f9f8d2


ATLAS-1908: updated to use existing Kafka consumer properties when equivalent 
new Kafka consumer properties are not present

Signed-off-by: Madhan Neethiraj <mad...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/0d8f9f8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/0d8f9f8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/0d8f9f8d

Branch: refs/heads/master
Commit: 0d8f9f8d2d3902471dc246bd51d3eb7512021f74
Parents: de91608
Author: nixonrodrigues <ni...@apache.org>
Authored: Mon Jul 3 18:18:47 2017 +0530
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Fri Jul 14 15:28:58 2017 -0700

----------------------------------------------------------------------
 distro/src/conf/atlas-application.properties        |  2 +-
 .../org/apache/atlas/kafka/AtlasKafkaConsumer.java  | 13 +++++++------
 .../org/apache/atlas/kafka/KafkaNotification.java   | 12 ++++++++++--
 .../atlas/notification/NotificationConsumer.java    |  3 +--
 .../org/apache/atlas/kafka/KafkaConsumerTest.java   | 16 ++++++++--------
 .../apache/atlas/kafka/KafkaNotificationTest.java   |  2 +-
 .../AbstractNotificationConsumerTest.java           |  8 ++++----
 .../notification/NotificationHookConsumer.java      |  2 +-
 .../NotificationHookConsumerKafkaTest.java          |  2 +-
 .../atlas/web/integration/BaseResourceIT.java       |  2 +-
 10 files changed, 35 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0d8f9f8d/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties 
b/distro/src/conf/atlas-application.properties
index c3213df..29a4cc1 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -79,7 +79,7 @@ atlas.kafka.hook.group.id=atlas
 atlas.kafka.enable.auto.commit=false
 atlas.kafka.auto.offset.reset=earliest
 atlas.kafka.session.timeout.ms=30000
-
+atlas.kafka.poll.timeout.ms=1000
 
 atlas.notification.create.topics=true
 atlas.notification.replicas=1

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0d8f9f8d/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java 
b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
index 9c15243..52d0916 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java
@@ -41,19 +41,20 @@ public class AtlasKafkaConsumer<T> extends 
AbstractNotificationConsumer<T> {
     private static final Logger LOG = 
LoggerFactory.getLogger(AtlasKafkaConsumer.class);
 
     private final KafkaConsumer kafkaConsumer;
-    private final boolean       autoCommitEnabled;
+    private final boolean autoCommitEnabled;
+    private long pollTimeoutMilliSeconds = 1000L;
 
-    public AtlasKafkaConsumer(MessageDeserializer<T> deserializer, 
KafkaConsumer kafkaConsumer, boolean autoCommitEnabled) {
+    public AtlasKafkaConsumer(MessageDeserializer<T> deserializer, 
KafkaConsumer kafkaConsumer, boolean autoCommitEnabled, long 
pollTimeoutMilliSeconds) {
         super(deserializer);
-
-        this.kafkaConsumer     = kafkaConsumer;
+        this.kafkaConsumer = kafkaConsumer;
         this.autoCommitEnabled = autoCommitEnabled;
+        this.pollTimeoutMilliSeconds = pollTimeoutMilliSeconds;
     }
 
-    public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
+    public List<AtlasKafkaMessage<T>> receive() {
         List<AtlasKafkaMessage<T>> messages = new ArrayList();
 
-        ConsumerRecords<?, ?> records = 
kafkaConsumer.poll(timeoutMilliSeconds);
+        ConsumerRecords<?, ?> records = 
kafkaConsumer.poll(pollTimeoutMilliSeconds);
 
         if (records != null) {
             for (ConsumerRecord<?, ?> record : records) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0d8f9f8d/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java 
b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 366c8a7..38889ef 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -83,6 +83,7 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
     private Properties properties;
     private KafkaConsumer consumer = null;
     private KafkaProducer producer = null;
+    private Long pollTimeOutMs = 1000L;
 
     private static final Map<NotificationType, String> TOPIC_MAP = new 
HashMap<NotificationType, String>() {
         {
@@ -124,6 +125,13 @@ public class KafkaNotification extends 
AbstractNotification implements Service {
         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                 "org.apache.kafka.common.serialization.StringDeserializer");
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        pollTimeOutMs = subsetConfiguration.getLong("poll.timeout.ms", 1000);
+        boolean oldApiCommitEnbleFlag = 
subsetConfiguration.getBoolean("auto.commit.enable",false);
+        //set old autocommit value if new autoCommit property is not set.
+        properties.put("enable.auto.commit", 
subsetConfiguration.getBoolean("enable.auto.commit", oldApiCommitEnbleFlag));
+        properties.put("session.timeout.ms", 
subsetConfiguration.getString("session.timeout.ms", "30000"));
+
     }
 
     @VisibleForTesting
@@ -167,7 +175,7 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
     public <T> List<NotificationConsumer<T>> createConsumers(NotificationType 
notificationType,
                                                              int numConsumers) 
{
         return createConsumers(notificationType, numConsumers,
-                Boolean.valueOf(properties.getProperty("enable.auto.commit", 
"true")));
+                Boolean.valueOf(properties.getProperty("enable.auto.commit", 
properties.getProperty("auto.commit.enable","false"))));
     }
 
     @VisibleForTesting
@@ -177,7 +185,7 @@ public class KafkaNotification extends AbstractNotification 
implements Service {
         Properties consumerProperties = 
getConsumerProperties(notificationType);
 
         List<NotificationConsumer<T>> consumers = new ArrayList<>();
-        AtlasKafkaConsumer kafkaConsumer = new 
AtlasKafkaConsumer(notificationType.getDeserializer(), 
getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), 
autoCommitEnabled);
+        AtlasKafkaConsumer kafkaConsumer = new 
AtlasKafkaConsumer(notificationType.getDeserializer(), 
getKafkaConsumer(consumerProperties,notificationType, autoCommitEnabled), 
autoCommitEnabled, pollTimeOutMs );
         consumers.add(kafkaConsumer);
         return consumers;
     }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0d8f9f8d/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
----------------------------------------------------------------------
diff --git 
a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
 
b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 22e40f9..6d1c08a 100644
--- 
a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ 
b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -41,8 +41,7 @@ public interface NotificationConsumer<T> {
 
     /**
      * Fetch data for the topics from Kafka
-     * @param timeoutMilliSeconds poll timeout
      * @return List containing kafka message and partionId and offset.
      */
-    List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds);
+    List<AtlasKafkaMessage<T>> receive();
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0d8f9f8d/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 70059cb..9b712f4 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -95,12 +95,12 @@ public class KafkaConsumerTest {
         ConsumerRecords records = new ConsumerRecords(mp);
 
 
-        when(kafkaConsumer.poll(1000)).thenReturn(records);
+        when(kafkaConsumer.poll(100)).thenReturn(records);
         when(messageAndMetadata.message()).thenReturn(json);
 
 
-        AtlasKafkaConsumer consumer = new 
AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(),
 kafkaConsumer,false);
-        List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> 
messageList = consumer.receive(1000);
+        AtlasKafkaConsumer consumer = new 
AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(),
 kafkaConsumer, false, 100L);
+        List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> 
messageList = consumer.receive();
         assertTrue(messageList.size() > 0);
 
         HookNotification.HookNotificationMessage consumedMessage  = 
messageList.get(0).getMessage();
@@ -131,12 +131,12 @@ public class KafkaConsumerTest {
         mp.put(tp,klist);
         ConsumerRecords records = new ConsumerRecords(mp);
 
-        when(kafkaConsumer.poll(1000)).thenReturn(records);
+        when(kafkaConsumer.poll(100L)).thenReturn(records);
         when(messageAndMetadata.message()).thenReturn(json);
 
-        AtlasKafkaConsumer consumer =new 
AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(),
 kafkaConsumer ,false);
+        AtlasKafkaConsumer consumer =new 
AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(),
 kafkaConsumer ,false, 100L);
         try {
-            List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> 
messageList = consumer.receive(1000);
+            List<AtlasKafkaMessage<HookNotification.HookNotificationMessage>> 
messageList = consumer.receive();
             assertTrue(messageList.size() > 0);
 
             HookNotification.HookNotificationMessage consumedMessage  = 
messageList.get(0).getMessage();
@@ -154,7 +154,7 @@ public class KafkaConsumerTest {
 
         TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
 
-        AtlasKafkaConsumer consumer =new 
AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(),
 kafkaConsumer, false);
+        AtlasKafkaConsumer consumer =new 
AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(),
 kafkaConsumer, false, 100L);
 
         consumer.commit(tp, 1);
 
@@ -166,7 +166,7 @@ public class KafkaConsumerTest {
 
         TopicPartition tp = new TopicPartition("ATLAS_HOOK",0);
 
-        AtlasKafkaConsumer consumer =new 
AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(),
 kafkaConsumer, true);
+        AtlasKafkaConsumer consumer =new 
AtlasKafkaConsumer(NotificationInterface.NotificationType.HOOK.getDeserializer(),
 kafkaConsumer, true , 100L);
 
         consumer.commit(tp, 1);
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0d8f9f8d/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index c791d43..a1e13b9 100644
--- 
a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ 
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -70,7 +70,7 @@ public class KafkaNotificationTest {
         List<AtlasKafkaMessage<Object>> messages = null ;
         long startTime = System.currentTimeMillis(); //fetch starting time
         while ((System.currentTimeMillis() - startTime) < 10000) {
-             messages = consumer.receive(1000L);
+             messages = consumer.receive();
             if (messages.size() > 0) {
                 break;
             }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0d8f9f8d/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
 
b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index 8324b57..68fe3d7 100644
--- 
a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ 
b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -67,7 +67,7 @@ public class AbstractNotificationConsumerTest {
         NotificationConsumer<TestMessage> consumer =
                 new TestNotificationConsumer<>(versionedMessageType, jsonList, 
logger);
 
-        List<AtlasKafkaMessage<TestMessage>> messageList = 
consumer.receive(1000L);
+        List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
 
         assertFalse(messageList.isEmpty());
 
@@ -106,7 +106,7 @@ public class AbstractNotificationConsumerTest {
         NotificationConsumer<TestMessage> consumer =
             new TestNotificationConsumer<>(versionedMessageType, jsonList, 
logger);
 
-        List<AtlasKafkaMessage<TestMessage>> messageList = 
consumer.receive(1000L);
+        List<AtlasKafkaMessage<TestMessage>> messageList = consumer.receive();
 
         assertEquals(new TestMessage("sValue1", 99), 
messageList.get(0).getMessage());
 
@@ -138,7 +138,7 @@ public class AbstractNotificationConsumerTest {
         NotificationConsumer<TestMessage> consumer =
             new TestNotificationConsumer<>(versionedMessageType, jsonList, 
logger);
         try {
-            List<AtlasKafkaMessage<TestMessage>> messageList = 
consumer.receive(1000L);
+            List<AtlasKafkaMessage<TestMessage>> messageList = 
consumer.receive();
 
             messageList.get(1).getMessage();
 
@@ -203,7 +203,7 @@ public class AbstractNotificationConsumerTest {
         }
 
         @Override
-        public List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds) {
+        public List<AtlasKafkaMessage<T>> receive() {
             List<AtlasKafkaMessage<T>> tempMessageList = new ArrayList();
             for(Object json :  messageList) {
                 tempMessageList.add(new 
AtlasKafkaMessage(deserializer.deserialize((String)json), -1, -1));

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0d8f9f8d/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 0dea0e2..51276d3 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -224,7 +224,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
             while (shouldRun.get()) {
                 try {
-                    List<AtlasKafkaMessage<HookNotificationMessage>> messages 
= consumer.receive(1000L);
+                    List<AtlasKafkaMessage<HookNotificationMessage>> messages 
= consumer.receive();
                     for (AtlasKafkaMessage<HookNotificationMessage> msg : 
messages) {
                         handleMessage(msg);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0d8f9f8d/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
index 650ca0a..eb37fa8 100644
--- 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -155,7 +155,7 @@ public class NotificationHookConsumerKafkaTest {
         try {
             long startTime = System.currentTimeMillis(); //fetch starting time
             while ((System.currentTimeMillis() - startTime) < 10000) {
-                List<AtlasKafkaMessage<HookNotificationMessage>> messages = 
consumer.receive(1000L);
+                List<AtlasKafkaMessage<HookNotificationMessage>> messages = 
consumer.receive();
 
                 for (AtlasKafkaMessage<HookNotificationMessage> msg : 
messages) {
                     hookConsumer.handleMessage(msg);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/0d8f9f8d/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java 
b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
index c036cfa..496185f 100755
--- a/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/integration/BaseResourceIT.java
@@ -637,7 +637,7 @@ public abstract class BaseResourceIT {
                 try {
 
                     while (System.currentTimeMillis() < maxCurrentTime) {
-                        List<AtlasKafkaMessage<EntityNotification>> 
messageList = consumer.receive(1000);
+                        List<AtlasKafkaMessage<EntityNotification>> 
messageList = consumer.receive();
                             if(messageList.size() > 0) {
                                 EntityNotification notification = 
messageList.get(0).getMessage();
                                 if (predicate.evaluate(notification)) {

Reply via email to