Michael Blow has submitted this change and it was merged. Change subject: Wait For ClusterState ACTIVE Before Notifying ZK ......................................................................
Wait For ClusterState ACTIVE Before Notifying ZK Defer notifying ZooKeeper that the cluster is up until it transitions to ACTIVE Change-Id: Ieaaeb2876edad9cfa3f23c2cbe00e058bdc1c8cc Reviewed-on: https://asterix-gerrit.ics.uci.edu/1678 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Ian Maxon <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java 5 files changed, 62 insertions(+), 21 deletions(-) Approvals: Ian Maxon: Looks good to me, approved Jenkins: Verified; No violations found; No violations found; Verified diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java index 634824c..3cdeb6f 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -38,6 +39,7 @@ import org.apache.asterix.runtime.utils.AppContextInfo; import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.api.exceptions.HyracksDataException; /** * Base class for language translators. Contains the common validation logic for language @@ -52,17 +54,15 @@ if (!(ClusterStateManager.INSTANCE.getState().equals(ClusterState.ACTIVE) && ClusterStateManager.INSTANCE.isGlobalRecoveryCompleted())) { int maxWaitCycles = AppContextInfo.INSTANCE.getExternalProperties().getMaxWaitClusterActive(); - int waitCycleCount = 0; try { - while (!ClusterStateManager.INSTANCE.getState().equals(ClusterState.ACTIVE) - && waitCycleCount < maxWaitCycles) { - Thread.sleep(1000); - waitCycleCount++; - } + ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE, maxWaitCycles, TimeUnit.SECONDS); + } catch (HyracksDataException e) { + throw new AsterixException(e); } catch (InterruptedException e) { if (LOGGER.isLoggable(Level.WARNING)) { LOGGER.warning("Thread interrupted while waiting for cluster to be " + ClusterState.ACTIVE); } + Thread.currentThread().interrupt(); } if (!ClusterStateManager.INSTANCE.getState().equals(ClusterState.ACTIVE)) { throw new AsterixException("Cluster is in " + ClusterState.UNUSABLE + " state." @@ -91,6 +91,7 @@ if (LOGGER.isLoggable(Level.WARNING)) { LOGGER.warning("Thread interrupted while waiting for cluster to complete global recovery "); } + Thread.currentThread().interrupt(); } if (!ClusterStateManager.INSTANCE.isGlobalRecoveryCompleted()) { throw new AsterixException("Cluster Global recovery is not yet complete and the system is in " diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index f3c9744..53fc7ec 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -121,11 +121,7 @@ } } // Wait until cluster becomes active - synchronized (ClusterStateManager.INSTANCE) { - while (ClusterStateManager.INSTANCE.getState() != ClusterState.ACTIVE) { - ClusterStateManager.INSTANCE.wait(); - } - } + ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE); hcc = new HyracksConnection(cc.getConfig().getClientListenAddress(), cc.getConfig().getClientListenPort()); ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index cb2bf64..578c206 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.logging.Level; import java.util.logging.Logger; @@ -52,6 +53,7 @@ import org.apache.asterix.app.external.ExternalLibraryUtils; import org.apache.asterix.app.replication.FaultToleranceStrategyFactory; import org.apache.asterix.common.api.AsterixThreadFactory; +import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.config.AsterixExtension; import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.ExternalProperties; @@ -71,6 +73,7 @@ import org.apache.asterix.metadata.cluster.ClusterManagerProvider; import org.apache.asterix.runtime.job.resource.JobCapacityController; import org.apache.asterix.runtime.utils.AppContextInfo; +import org.apache.asterix.runtime.utils.ClusterStateManager; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.application.IServiceContext; @@ -194,7 +197,7 @@ jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc); jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, AppContextInfo.INSTANCE); jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR, - ((ClusterControllerService) ccServiceCtx.getControllerService()).getExecutor()); + ccServiceCtx.getControllerService().getExecutor()); // AQL rest APIs. addServlet(jsonAPIServer, Servlets.AQL_QUERY); @@ -291,13 +294,16 @@ } private IStatementExecutorFactory getStatementExecutorFactory() { - return ccExtensionManager.getStatementExecutorFactory( - ((ClusterControllerService) ccServiceCtx.getControllerService()).getExecutor()); + return ccExtensionManager.getStatementExecutorFactory(ccServiceCtx.getControllerService().getExecutor()); } @Override public void startupCompleted() throws Exception { - ClusterManagerProvider.getClusterManager().notifyStartupCompleted(); + ccServiceCtx.getControllerService().getExecutor().submit((Callable)() -> { + ClusterStateManager.INSTANCE.waitForState(ClusterState.ACTIVE); + ClusterManagerProvider.getClusterManager().notifyStartupCompleted(); + return null; + }); } @Override diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index bf03d54..a753db3 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.cluster; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.hyracks.api.config.IOption; @@ -83,4 +84,17 @@ * @return A copy of the current state of the cluster partitions. */ ClusterPartition[] getClusterPartitons(); + + /** + * Blocks until the cluster state becomes {@code state} + */ + void waitForState(ClusterState state) throws HyracksDataException, InterruptedException; + + /** + * Blocks until the cluster state becomes {@code state}, or timeout is exhausted. + * @return true if the desired state was reached before timeout occurred + */ + boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit) + throws HyracksDataException, InterruptedException; + } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index f65979f..6bfbf77 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -40,11 +41,11 @@ import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; +import org.apache.hyracks.control.common.controllers.NCConfig; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.hyracks.control.common.controllers.NCConfig; /** * A holder class for properties related to the Asterix cluster. @@ -110,6 +111,8 @@ public synchronized void setState(ClusterState state) { this.state = state; LOGGER.info("Cluster State is now " + state.name()); + // Notify any waiting threads for the cluster state to change. + notifyAll(); } @Override @@ -149,27 +152,48 @@ resetClusterPartitionConstraint(); for (ClusterPartition p : clusterPartitions.values()) { if (!p.isActive()) { - state = ClusterState.UNUSABLE; - LOGGER.info("Cluster is in UNUSABLE state"); + setState(ClusterState.UNUSABLE); return; } } - state = ClusterState.PENDING; + setState(ClusterState.PENDING); LOGGER.info("Cluster is now " + state); // if all storage partitions are active as well as the metadata node, then the cluster is active if (metadataNodeActive) { AppContextInfo.INSTANCE.getMetadataBootstrap().init(); - state = ClusterState.ACTIVE; + setState(ClusterState.ACTIVE); LOGGER.info("Cluster is now " + state); - // Notify any waiting threads for the cluster to be active. notifyAll(); // start global recovery AppContextInfo.INSTANCE.getGlobalRecoveryManager().startGlobalRecovery(); } } + @Override + public synchronized void waitForState(ClusterState waitForState) throws HyracksDataException, InterruptedException { + while (state != waitForState) { + wait(); + } + } + + @Override + public synchronized boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit) + throws HyracksDataException, InterruptedException { + final long startMillis = System.currentTimeMillis(); + final long endMillis = startMillis + unit.toMillis(timeout); + while (state != waitForState) { + long millisToSleep = endMillis - System.currentTimeMillis(); + if (millisToSleep > 0) { + wait(millisToSleep); + } else { + return false; + } + } + return true; + } + /** * Returns the IO devices configured for a Node Controller * -- To view, visit https://asterix-gerrit.ics.uci.edu/1678 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ieaaeb2876edad9cfa3f23c2cbe00e058bdc1c8cc Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
