Repository: atlas
Updated Branches:
  refs/heads/0.8-incubating 57e1991b8 -> 31991ee46


ATLAS-2033: HookConsumer updated to address case where consumer is stopped 
before starting. Updated unit tests.

(cherry picked from commit 6f9684b4fb0a1c96993df900305d0c45c9a4e32f)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/31991ee4
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/31991ee4
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/31991ee4

Branch: refs/heads/0.8-incubating
Commit: 31991ee46e6de438fb75dd6be85f033b16b98baa
Parents: 57e1991
Author: ashutoshm <ames...@hortonworks.com>
Authored: Thu Aug 10 09:54:38 2017 -0700
Committer: ashutoshm <ames...@hortonworks.com>
Committed: Thu Aug 10 09:56:24 2017 -0700

----------------------------------------------------------------------
 .../notification/NotificationHookConsumer.java  | 18 +++++--
 .../NotificationHookConsumerTest.java           | 53 ++++++++++++++++----
 2 files changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/31991ee4/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index a74b841..67cb9be 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -95,7 +95,9 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
     private NotificationInterface notificationInterface;
     private ExecutorService executors;
     private Configuration applicationProperties;
-    private List<HookConsumer> consumers;
+
+    @VisibleForTesting
+    List<HookConsumer> consumers;
 
     @Inject
     public NotificationHookConsumer(NotificationInterface 
notificationInterface, AtlasEntityStore atlasEntityStore,
@@ -210,6 +212,7 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         }
     }
 
+    @VisibleForTesting
     class HookConsumer extends ShutdownableThread {
         private final NotificationConsumer<HookNotificationMessage> consumer;
         private final AtomicBoolean shouldRun = new AtomicBoolean(false);
@@ -417,6 +420,12 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         public void shutdown() {
             LOG.info("==> HookConsumer shutdown()");
 
+            // handle the case where thread was not started at all
+            // and shutdown called
+            if(shouldRun.get() == false) {
+                return;
+            }
+
             super.initiateShutdown();
             shouldRun.set(false);
             if (consumer != null) {
@@ -426,7 +435,6 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
 
             LOG.info("<== HookConsumer shutdown()");
         }
-
     }
 
     private void audit(String messageUser, String method, String path) {

http://git-wip-us.apache.org/repos/asf/atlas/blob/31991ee4/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index f4ec56a..a6f58e8 100644
--- 
a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -33,11 +33,14 @@ import org.apache.atlas.type.AtlasTypeRegistry;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.commons.configuration.Configuration;
+import org.apache.kafka.common.TopicPartition;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import org.apache.kafka.common.TopicPartition;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -48,7 +51,6 @@ import static org.testng.AssertJUnit.assertFalse;
 import static org.testng.AssertJUnit.assertTrue;
 
 public class NotificationHookConsumerTest {
-
     @Mock
     private NotificationInterface notificationInterface;
 
@@ -126,7 +128,7 @@ public class NotificationHookConsumerTest {
         when(message.getEntities()).thenReturn(Arrays.asList(mock));
 
         hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
-        verify(consumer).commit(any(TopicPartition.class),anyInt());
+        verify(consumer).commit(any(TopicPartition.class), anyInt());
     }
 
     @Test
@@ -138,8 +140,10 @@ public class NotificationHookConsumerTest {
                 notificationHookConsumer.new HookConsumer(consumer);
         HookNotification.EntityCreateRequest message = new 
HookNotification.EntityCreateRequest("user",
                 new ArrayList<Referenceable>() {
-            { add(mock(Referenceable.class)); }
-        });
+                    {
+                        add(mock(Referenceable.class));
+                    }
+                });
         when(atlasEntityStore.createOrUpdate(any(EntityStream.class), 
anyBoolean())).thenThrow(new RuntimeException("Simulating exception in 
processing message"));
         hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1));
 
@@ -204,13 +208,44 @@ public class NotificationHookConsumerTest {
 
     @Test
     public void testConsumersAreStoppedWhenInstanceBecomesPassive() throws 
Exception {
+        
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
         
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, 
false)).thenReturn(true);
         
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 
1)).thenReturn(1);
         List<NotificationConsumer<Object>> consumers = new ArrayList();
-        consumers.add(mock(NotificationConsumer.class));
-        
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK,
 1)).
-                thenReturn(consumers);
-        NotificationHookConsumer notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry);
+        NotificationConsumer notificationConsumerMock = 
mock(NotificationConsumer.class);
+        consumers.add(notificationConsumerMock);
+
+        
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK,
 1)).thenReturn(consumers);
+        final NotificationHookConsumer notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry);
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocationOnMock) throws 
Throwable {
+                notificationHookConsumer.consumers.get(0).start();
+                Thread.sleep(500);
+                return null;
+            }
+        
}).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class));
+
+        notificationHookConsumer.startInternal(configuration, executorService);
+        notificationHookConsumer.instanceIsPassive();
+        verify(notificationInterface).close();
+        verify(executorService).shutdown();
+        verify(notificationConsumerMock).wakeup();
+    }
+
+    @Test
+    public void consumersStoppedBeforeStarting() throws Exception {
+        
when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE);
+        
when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, 
false)).thenReturn(true);
+        
when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 
1)).thenReturn(1);
+        List<NotificationConsumer<Object>> consumers = new ArrayList();
+        NotificationConsumer notificationConsumerMock = 
mock(NotificationConsumer.class);
+        consumers.add(notificationConsumerMock);
+
+        
when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK,
 1)).thenReturn(consumers);
+        final NotificationHookConsumer notificationHookConsumer = new 
NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, 
instanceConverter, typeRegistry);
+
         notificationHookConsumer.startInternal(configuration, executorService);
         notificationHookConsumer.instanceIsPassive();
         verify(notificationInterface).close();

Reply via email to