Repository: incubator-atlas
Updated Branches:
  refs/heads/master b0470f50e -> c3808cf1f


ATLAS-1944: Implemented ShutdownableThread for HookConsumer

Signed-off-by: Madhan Neethiraj <mad...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/18745cf4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/18745cf4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/18745cf4

Branch: refs/heads/master
Commit: 18745cf4b98af9c45e853daa280342dde8da1300
Parents: b0470f5
Author: ashutoshm <ames...@hortonworks.com>
Authored: Wed Jul 12 14:43:45 2017 -0700
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Thu Jul 13 13:25:15 2017 -0700

----------------------------------------------------------------------
 .../notification/NotificationHookConsumer.java  | 56 +++++++++++---------
 1 file changed, 31 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/18745cf4/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 9e5b864..0dea0e2 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -19,6 +19,7 @@ package org.apache.atlas.notification;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import kafka.utils.ShutdownableThread;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.AtlasServiceException;
@@ -28,7 +29,11 @@ import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
+import org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
 import 
org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
+import 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
@@ -41,11 +46,12 @@ import org.apache.atlas.web.filters.AuditFilter;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.atlas.web.util.DateTimeHelper;
 import org.apache.commons.configuration.Configuration;
+import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
-import org.apache.kafka.common.TopicPartition;
+
 import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.Date;
@@ -56,14 +62,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.atlas.AtlasClientV2.CREATE_ENTITY;
-import static org.apache.atlas.AtlasClientV2.DELETE_ENTITY_BY_ATTRIBUTE;
-import static org.apache.atlas.AtlasClientV2.UPDATE_ENTITY;
-import static org.apache.atlas.AtlasClientV2.UPDATE_ENTITY_BY_ATTRIBUTE;
-import static 
org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
-import static 
org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
-import static 
org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
-import static 
org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import static org.apache.atlas.AtlasClientV2.*;
 
 /**
  * Consumer of notifications from hooks e.g., hive hook etc.
@@ -80,7 +79,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     public static final String CONSUMER_THREADS_PROPERTY = 
"atlas.notification.hook.numthreads";
     public static final String CONSUMER_RETRIES_PROPERTY = 
"atlas.notification.hook.maxretries";
     public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = 
"atlas.notification.hook.failedcachesize";
-    public static final String 
CONSUMER_RETRY_INTERVAL="atlas.notification.consumer.retry.interval";
+    public static final String CONSUMER_RETRY_INTERVAL = 
"atlas.notification.consumer.retry.interval";
 
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
     private final AtlasEntityStore atlasEntityStore;
@@ -177,7 +176,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
     /**
      * Start Kafka consumer threads that read from Kafka topic when server is 
activated.
-     *
+     * <p>
      * Since the consumers create / update entities to the shared backend 
store, only the active instance
      * should perform this activity. Hence, these threads are started only on 
server activation.
      */
@@ -189,7 +188,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
     /**
      * Stop Kafka consumer threads that read from Kafka topic when server is 
de-activated.
-     *
+     * <p>
      * Since the consumers create / update entities to the shared backend 
store, only the active instance
      * should perform this activity. Hence, these threads are stopped only on 
server deactivation.
      */
@@ -205,18 +204,18 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         }
     }
 
-    class HookConsumer implements Runnable {
+    class HookConsumer extends ShutdownableThread {
         private final NotificationConsumer<HookNotificationMessage> consumer;
         private final AtomicBoolean shouldRun = new AtomicBoolean(false);
         private List<HookNotificationMessage> failedMessages = new 
ArrayList<>();
 
         public HookConsumer(NotificationConsumer<HookNotificationMessage> 
consumer) {
+            super("atlas-hook-consumer-thread", false);
             this.consumer = consumer;
         }
 
-
         @Override
-        public void run() {
+        public void doWork() {
             shouldRun.set(true);
 
             if (!serverAvailable(new NotificationHookConsumer.Timer())) {
@@ -226,7 +225,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             while (shouldRun.get()) {
                 try {
                     List<AtlasKafkaMessage<HookNotificationMessage>> messages 
= consumer.receive(1000L);
-                    for (AtlasKafkaMessage<HookNotificationMessage> msg :  
messages){
+                    for (AtlasKafkaMessage<HookNotificationMessage> msg : 
messages) {
                         handleMessage(msg);
                     }
                 } catch (Throwable t) {
@@ -267,15 +266,17 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
                             if (numRetries == 0) { // audit only on the first 
attempt
                                 audit(messageUser, 
UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
-                                      
String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), 
partialUpdateRequest.getTypeName()));
+                                        
String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(), 
partialUpdateRequest.getTypeName()));
                             }
 
                             Referenceable referenceable = 
partialUpdateRequest.getEntity();
                             entities = 
instanceConverter.toAtlasEntity(referenceable);
 
                             AtlasEntityType entityType = 
typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
-                            String guid = 
AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, 
Object>(){
-                                { put(partialUpdateRequest.getAttribute(), 
partialUpdateRequest.getAttributeValue()); }
+                            String guid = 
AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, 
Object>() {
+                                {
+                                    put(partialUpdateRequest.getAttribute(), 
partialUpdateRequest.getAttributeValue());
+                                }
                             });
 
                             // There should only be one root entity
@@ -289,13 +290,15 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
                             if (numRetries == 0) { // audit only on the first 
attempt
                                 audit(messageUser, 
DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
-                                      
String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), 
deleteRequest.getTypeName()));
+                                        
String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(), 
deleteRequest.getTypeName()));
                             }
 
                             try {
                                 AtlasEntityType type = (AtlasEntityType) 
typeRegistry.getType(deleteRequest.getTypeName());
                                 atlasEntityStore.deleteByUniqueAttributes(type,
-                                        new HashMap<String, Object>() {{ 
put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue()); }});
+                                        new HashMap<String, Object>() {{
+                                            put(deleteRequest.getAttribute(), 
deleteRequest.getAttributeValue());
+                                        }});
                             } catch (ClassCastException cle) {
                                 LOG.error("Failed to do a partial update on 
Entity");
                             }
@@ -319,10 +322,10 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
                     break;
                 } catch (Throwable e) {
                     LOG.warn("Error handling message", e);
-                    try{
+                    try {
                         LOG.info("Sleeping for {} ms before retry", 
consumerRetryInterval);
                         Thread.sleep(consumerRetryInterval);
-                    }catch (InterruptedException ie){
+                    } catch (InterruptedException ie) {
                         LOG.error("Notification consumer thread sleep 
interrupted");
                     }
 
@@ -379,9 +382,12 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
             return true;
         }
 
-        public void stop() {
+        @Override
+        public void shutdown() {
+            super.initiateShutdown();
             shouldRun.set(false);
             consumer.close();
+            super.awaitShutdown();
         }
     }
 
@@ -393,4 +399,4 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, 
path, LOCALHOST,
                 DateTimeHelper.formatDateUTC(new Date()));
     }
-}
+}
\ No newline at end of file

Reply via email to