Repository: atlas
Updated Branches:
  refs/heads/branch-0.8 de592f7d1 -> 28dda9080


ATLAS-2220: Active state change listener order made predictable

(cherry picked from commit 3959b318e4cc1e8df26f03a4632cfb6ecd8cb357)


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/28dda908
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/28dda908
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/28dda908

Branch: refs/heads/branch-0.8
Commit: 28dda90802689c0926ca14b89065809150df8caa
Parents: de592f7
Author: Madhan Neethiraj <mad...@apache.org>
Authored: Fri Oct 20 14:51:40 2017 -0700
Committer: Madhan Neethiraj <mad...@apache.org>
Committed: Fri Oct 20 17:36:19 2017 -0700

----------------------------------------------------------------------
 .../audit/HBaseBasedAuditRepository.java        |  5 ++++
 .../graph/GraphBackedSearchIndexer.java         |  5 ++++
 .../bootstrap/AtlasTypeDefStoreInitializer.java |  5 ++++
 .../atlas/services/DefaultMetadataService.java  |  5 ++++
 .../listener/ActiveStateChangeHandler.java      | 25 ++++++++++++++++++++
 .../notification/NotificationHookConsumer.java  |  5 ++++
 .../service/ActiveInstanceElectorService.java   | 21 ++++++++++++----
 7 files changed, 67 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/28dda908/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
 
b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
index 5a5a2c1..774934c 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java
@@ -406,4 +406,9 @@ public class HBaseBasedAuditRepository implements Service, 
EntityAuditRepository
     public void instanceIsPassive() {
         LOG.info("Reacting to passive: No action for now.");
     }
+
+    @Override
+    public int getHandlerOrder() {
+        return HandlerOrder.HBASE_AUDIT_REPOSITORY.getOrder();
+    }
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/28dda908/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 9cd2991..6eee24b 100755
--- 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -698,6 +698,11 @@ public class GraphBackedSearchIndexer implements 
SearchIndexer, ActiveStateChang
     }
 
     @Override
+    public int getHandlerOrder() {
+        return HandlerOrder.GRAPH_BACKED_SEARCH_INDEXER.getOrder();
+    }
+
+    @Override
     public void onChange(ChangedTypeDefs changedTypeDefs) throws 
AtlasBaseException {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Processing changed typedefs {}", changedTypeDefs);

http://git-wip-us.apache.org/repos/asf/atlas/blob/28dda908/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
index 93e382c..58df006 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java
@@ -279,6 +279,11 @@ public class AtlasTypeDefStoreInitializer implements 
ActiveStateChangeHandler {
         LOG.info("<== AtlasTypeDefStoreInitializer.instanceIsPassive()");
     }
 
+    @Override
+    public int getHandlerOrder() {
+        return HandlerOrder.TYPEDEF_STORE_INITIALIZER.getOrder();
+    }
+
     private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, 
AtlasStructDef newStructDef, boolean checkTypeVersion) {
         boolean ret = isTypeUpdateApplicable(oldStructDef, newStructDef, 
checkTypeVersion);
 

http://git-wip-us.apache.org/repos/asf/atlas/blob/28dda908/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
----------------------------------------------------------------------
diff --git 
a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
 
b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
index 99d2107..9eb695c 100755
--- 
a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
+++ 
b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java
@@ -797,6 +797,11 @@ public class DefaultMetadataService implements 
MetadataService, ActiveStateChang
     }
 
     @Override
+    public int getHandlerOrder() {
+        return HandlerOrder.DEFAULT_METADATA_SERVICE.getOrder();
+    }
+
+    @Override
     public void onChange(ChangedTypeDefs changedTypeDefs) throws 
AtlasBaseException {
         // All we need here is a restore of the type-system
         LOG.info("TypeSystem reset invoked by TypeRegistry changes");

http://git-wip-us.apache.org/repos/asf/atlas/blob/28dda908/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
----------------------------------------------------------------------
diff --git 
a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
 
b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
index 87a69ef..7d282b3 100644
--- 
a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
+++ 
b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java
@@ -26,6 +26,22 @@ import org.apache.atlas.AtlasException;
  * The two state transitions we handle are (1) becoming active and (2) 
becoming passive.
  */
 public interface ActiveStateChangeHandler {
+    public enum HandlerOrder {
+        HBASE_AUDIT_REPOSITORY(0),
+        GRAPH_BACKED_SEARCH_INDEXER(1),
+        TYPEDEF_STORE_INITIALIZER(2),
+        DEFAULT_METADATA_SERVICE(3),
+        NOTIFICATION_HOOK_CONSUMER(4);
+
+
+        private final int order;
+
+        private HandlerOrder(int order) {
+            this.order = order;
+        }
+
+        public int getOrder() { return order; }
+    }
 
     /**
      * Callback that is invoked on an implementor when this instance of Atlas 
server is declared the leader.
@@ -46,4 +62,13 @@ public interface ActiveStateChangeHandler {
      * @throws {@link AtlasException} if anything is wrong on shutdown
      */
     void instanceIsPassive() throws AtlasException;
+
+
+    /**
+     * Defines the order in which the handler should be called.
+     *   When state becomes active, the handler will be called from low order 
to high
+     *   When state becomes passive, the handler will be called from high 
order to low
+     *
+     */
+    int getHandlerOrder();
 }

http://git-wip-us.apache.org/repos/asf/atlas/blob/28dda908/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 4e0a55d..2df28f3 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -218,6 +218,11 @@ public class NotificationHookConsumer implements Service, 
ActiveStateChangeHandl
         stop();
     }
 
+    @Override
+    public int getHandlerOrder() {
+        return HandlerOrder.NOTIFICATION_HOOK_CONSUMER.getOrder();
+    }
+
     static class Timer {
         public void sleep(int interval) throws InterruptedException {
             Thread.sleep(interval);

http://git-wip-us.apache.org/repos/asf/atlas/blob/28dda908/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
 
b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
index 5071204..ad0cb84 100644
--- 
a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
+++ 
b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java
@@ -34,7 +34,9 @@ import org.springframework.stereotype.Component;
 import javax.inject.Inject;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Set;
 
 /**
@@ -58,7 +60,7 @@ public class ActiveInstanceElectorService implements Service, 
LeaderLatchListene
     private final ServiceState serviceState;
     private final ActiveInstanceState activeInstanceState;
     private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders;
-    private Collection<ActiveStateChangeHandler> activeStateChangeHandlers;
+    private List<ActiveStateChangeHandler> activeStateChangeHandlers;
     private CuratorFactory curatorFactory;
     private LeaderLatch leaderLatch;
     private String serverId;
@@ -158,6 +160,17 @@ public class ActiveInstanceElectorService implements 
Service, LeaderLatchListene
     private void cacheActiveStateChangeHandlers() {
         if (activeStateChangeHandlers.size()==0) {
             
activeStateChangeHandlers.addAll(activeStateChangeHandlerProviders);
+
+            LOG.info("activeStateChangeHandlers(): before reorder: " + 
activeStateChangeHandlers);
+
+            Collections.sort(activeStateChangeHandlers, new 
Comparator<ActiveStateChangeHandler>() {
+                @Override
+                public int compare(ActiveStateChangeHandler lhs, 
ActiveStateChangeHandler rhs) {
+                    return Integer.compare(lhs.getHandlerOrder(), 
rhs.getHandlerOrder());
+                }
+            });
+
+            LOG.info("activeStateChangeHandlers(): after reorder: " + 
activeStateChangeHandlers);
         }
     }
 
@@ -177,9 +190,9 @@ public class ActiveInstanceElectorService implements 
Service, LeaderLatchListene
     public void notLeader() {
         LOG.warn("Server instance with server id {} is removed as leader", 
serverId);
         serviceState.becomingPassive();
-        for (ActiveStateChangeHandler handler: activeStateChangeHandlers) {
+        for (int idx = activeStateChangeHandlers.size() - 1; idx >= 0; idx--) {
             try {
-                handler.instanceIsPassive();
+                activeStateChangeHandlers.get(idx).instanceIsPassive();
             } catch (AtlasException e) {
                 LOG.error("Error while reacting to passive state.", e);
             }

Reply via email to