Repository: atlas Updated Branches: refs/heads/branch-0.8 de592f7d1 -> 28dda9080
ATLAS-2220: Active state change listener order made predictable (cherry picked from commit 3959b318e4cc1e8df26f03a4632cfb6ecd8cb357) Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/28dda908 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/28dda908 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/28dda908 Branch: refs/heads/branch-0.8 Commit: 28dda90802689c0926ca14b89065809150df8caa Parents: de592f7 Author: Madhan Neethiraj <mad...@apache.org> Authored: Fri Oct 20 14:51:40 2017 -0700 Committer: Madhan Neethiraj <mad...@apache.org> Committed: Fri Oct 20 17:36:19 2017 -0700 ---------------------------------------------------------------------- .../audit/HBaseBasedAuditRepository.java | 5 ++++ .../graph/GraphBackedSearchIndexer.java | 5 ++++ .../bootstrap/AtlasTypeDefStoreInitializer.java | 5 ++++ .../atlas/services/DefaultMetadataService.java | 5 ++++ .../listener/ActiveStateChangeHandler.java | 25 ++++++++++++++++++++ .../notification/NotificationHookConsumer.java | 5 ++++ .../service/ActiveInstanceElectorService.java | 21 ++++++++++++---- 7 files changed, 67 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/28dda908/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java index 5a5a2c1..774934c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/HBaseBasedAuditRepository.java @@ -406,4 +406,9 @@ public class HBaseBasedAuditRepository implements Service, EntityAuditRepository public void instanceIsPassive() { LOG.info("Reacting to passive: No action for now."); } + + @Override + public int getHandlerOrder() { + return HandlerOrder.HBASE_AUDIT_REPOSITORY.getOrder(); + } } http://git-wip-us.apache.org/repos/asf/atlas/blob/28dda908/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index 9cd2991..6eee24b 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -698,6 +698,11 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang } @Override + public int getHandlerOrder() { + return HandlerOrder.GRAPH_BACKED_SEARCH_INDEXER.getOrder(); + } + + @Override public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException { if (LOG.isDebugEnabled()) { LOG.debug("Processing changed typedefs {}", changedTypeDefs); http://git-wip-us.apache.org/repos/asf/atlas/blob/28dda908/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java index 93e382c..58df006 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/bootstrap/AtlasTypeDefStoreInitializer.java @@ -279,6 +279,11 @@ public class AtlasTypeDefStoreInitializer implements ActiveStateChangeHandler { LOG.info("<== AtlasTypeDefStoreInitializer.instanceIsPassive()"); } + @Override + public int getHandlerOrder() { + return HandlerOrder.TYPEDEF_STORE_INITIALIZER.getOrder(); + } + private static boolean updateTypeAttributes(AtlasStructDef oldStructDef, AtlasStructDef newStructDef, boolean checkTypeVersion) { boolean ret = isTypeUpdateApplicable(oldStructDef, newStructDef, checkTypeVersion); http://git-wip-us.apache.org/repos/asf/atlas/blob/28dda908/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java index 99d2107..9eb695c 100755 --- a/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java +++ b/repository/src/main/java/org/apache/atlas/services/DefaultMetadataService.java @@ -797,6 +797,11 @@ public class DefaultMetadataService implements MetadataService, ActiveStateChang } @Override + public int getHandlerOrder() { + return HandlerOrder.DEFAULT_METADATA_SERVICE.getOrder(); + } + + @Override public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException { // All we need here is a restore of the type-system LOG.info("TypeSystem reset invoked by TypeRegistry changes"); http://git-wip-us.apache.org/repos/asf/atlas/blob/28dda908/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java ---------------------------------------------------------------------- diff --git a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java index 87a69ef..7d282b3 100644 --- a/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java +++ b/server-api/src/main/java/org/apache/atlas/listener/ActiveStateChangeHandler.java @@ -26,6 +26,22 @@ import org.apache.atlas.AtlasException; * The two state transitions we handle are (1) becoming active and (2) becoming passive. */ public interface ActiveStateChangeHandler { + public enum HandlerOrder { + HBASE_AUDIT_REPOSITORY(0), + GRAPH_BACKED_SEARCH_INDEXER(1), + TYPEDEF_STORE_INITIALIZER(2), + DEFAULT_METADATA_SERVICE(3), + NOTIFICATION_HOOK_CONSUMER(4); + + + private final int order; + + private HandlerOrder(int order) { + this.order = order; + } + + public int getOrder() { return order; } + } /** * Callback that is invoked on an implementor when this instance of Atlas server is declared the leader. @@ -46,4 +62,13 @@ public interface ActiveStateChangeHandler { * @throws {@link AtlasException} if anything is wrong on shutdown */ void instanceIsPassive() throws AtlasException; + + + /** + * Defines the order in which the handler should be called. + * When state becomes active, the handler will be called from low order to high + * When state becomes passive, the handler will be called from high order to low + * + */ + int getHandlerOrder(); } http://git-wip-us.apache.org/repos/asf/atlas/blob/28dda908/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 4e0a55d..2df28f3 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -218,6 +218,11 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl stop(); } + @Override + public int getHandlerOrder() { + return HandlerOrder.NOTIFICATION_HOOK_CONSUMER.getOrder(); + } + static class Timer { public void sleep(int interval) throws InterruptedException { Thread.sleep(interval); http://git-wip-us.apache.org/repos/asf/atlas/blob/28dda908/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java index 5071204..ad0cb84 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java @@ -34,7 +34,9 @@ import org.springframework.stereotype.Component; import javax.inject.Inject; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; import java.util.Set; /** @@ -58,7 +60,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene private final ServiceState serviceState; private final ActiveInstanceState activeInstanceState; private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders; - private Collection<ActiveStateChangeHandler> activeStateChangeHandlers; + private List<ActiveStateChangeHandler> activeStateChangeHandlers; private CuratorFactory curatorFactory; private LeaderLatch leaderLatch; private String serverId; @@ -158,6 +160,17 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene private void cacheActiveStateChangeHandlers() { if (activeStateChangeHandlers.size()==0) { activeStateChangeHandlers.addAll(activeStateChangeHandlerProviders); + + LOG.info("activeStateChangeHandlers(): before reorder: " + activeStateChangeHandlers); + + Collections.sort(activeStateChangeHandlers, new Comparator<ActiveStateChangeHandler>() { + @Override + public int compare(ActiveStateChangeHandler lhs, ActiveStateChangeHandler rhs) { + return Integer.compare(lhs.getHandlerOrder(), rhs.getHandlerOrder()); + } + }); + + LOG.info("activeStateChangeHandlers(): after reorder: " + activeStateChangeHandlers); } } @@ -177,9 +190,9 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene public void notLeader() { LOG.warn("Server instance with server id {} is removed as leader", serverId); serviceState.becomingPassive(); - for (ActiveStateChangeHandler handler: activeStateChangeHandlers) { + for (int idx = activeStateChangeHandlers.size() - 1; idx >= 0; idx--) { try { - handler.instanceIsPassive(); + activeStateChangeHandlers.get(idx).instanceIsPassive(); } catch (AtlasException e) { LOG.error("Error while reacting to passive state.", e); }