Michael Blow has submitted this change and it was merged. ( https://asterix-gerrit.ics.uci.edu/3394 )
Change subject: [NO ISSUE][*DB][CLUS] Add support for a rebalance required cluster state ...................................................................... [NO ISSUE][*DB][CLUS] Add support for a rebalance required cluster state Allow extensions to mandate that a rebalance is required in order for the cluster to go active Change-Id: I863f8bf3fe1ce8d59522c9a28a1283006ffa414c Reviewed-on: https://asterix-gerrit.ics.uci.edu/3394 Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> --- M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java 3 files changed, 55 insertions(+), 22 deletions(-) Approvals: Jenkins: Verified; ; Verified Murtadha Hubail: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java index c2d3303..e13756c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java @@ -30,8 +30,8 @@ PENDING, // the metadata node has not yet joined & initialized RECOVERING, // global recovery has not yet completed ACTIVE, // cluster is ACTIVE and ready for requests - REBALANCING, // replication is processing failbacks - SHUTTING_DOWN // a shutdown request has been received, and is underway + SHUTTING_DOWN, // a shutdown request has been received, and is underway + REBALANCE_REQUIRED // one or more datasets require rebalance before the cluster is usable } WorkType getClusterManagementWorkType(); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index 0e62851..6c39372 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.dataflow.ICcApplicationContext; @@ -107,6 +108,14 @@ * @return true if the desired state was reached before timeout occurred */ boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit) + throws HyracksDataException, InterruptedException; + + /** + * Blocks until the cluster state matches supplied predicate, or timeout is exhausted. + * + * @return the cluster state matching the predicate if it was satisfied before timeout occurred, otherwise null + */ + ClusterState waitForState(Predicate<ClusterState> condition, long timeout, TimeUnit unit) throws HyracksDataException, InterruptedException; /** @@ -250,4 +259,10 @@ * @return The metadata cluster partitions */ ClusterPartition getMetadataPartition(); + + /** + * Indicate whether one or more datasets must be rebalanced before the cluster becomes ACTIVE + * @param rebalanceRequired + */ + void setRebalanceRequired(boolean rebalanceRequired) throws HyracksDataException; } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index 16a479e..7933cd2 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -28,6 +28,7 @@ import java.util.SortedMap; import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.cluster.ClusterPartition; @@ -75,6 +76,7 @@ private INcLifecycleCoordinator lifecycleCoordinator; private ICcApplicationContext appCtx; private ClusterPartition metadataPartition; + private boolean rebalanceRequired; @Override public void setCcAppCtx(ICcApplicationContext appCtx) { @@ -186,45 +188,55 @@ return; } // the metadata bootstrap & global recovery must be complete before the cluster can be active - if (metadataNodeActive) { - if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING) { - setState(ClusterState.PENDING); - } - appCtx.getMetadataBootstrap().init(); - - if (appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) { - setState(ClusterState.ACTIVE); - } else { - // start global recovery - setState(ClusterState.RECOVERING); - appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx); - } - } else { + if (!metadataNodeActive) { + setState(ClusterState.PENDING); + return; + } + if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING) { setState(ClusterState.PENDING); } + appCtx.getMetadataBootstrap().init(); + + if (!appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) { + // start global recovery + setState(ClusterState.RECOVERING); + appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx); + return; + } + if (rebalanceRequired) { + setState(ClusterState.REBALANCE_REQUIRED); + return; + } + // finally- life is good, set the state to ACTIVE + setState(ClusterState.ACTIVE); } @Override - public synchronized void waitForState(ClusterState waitForState) throws HyracksDataException, InterruptedException { + public synchronized void waitForState(ClusterState waitForState) throws InterruptedException { while (state != waitForState) { wait(); } } @Override - public synchronized boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit) - throws HyracksDataException, InterruptedException { + public boolean waitForState(ClusterState waitForState, long timeout, TimeUnit unit) throws InterruptedException { + return waitForState(waitForState::equals, timeout, unit) != null; + } + + @Override + public synchronized ClusterState waitForState(Predicate<ClusterState> predicate, long timeout, TimeUnit unit) + throws InterruptedException { final long startMillis = System.currentTimeMillis(); final long endMillis = startMillis + unit.toMillis(timeout); - while (state != waitForState) { + while (!predicate.test(state)) { long millisToSleep = endMillis - System.currentTimeMillis(); if (millisToSleep > 0) { wait(millisToSleep); } else { - return false; + return null; } } - return true; + return state; } @Override @@ -458,6 +470,12 @@ return metadataPartition; } + @Override + public synchronized void setRebalanceRequired(boolean rebalanceRequired) throws HyracksDataException { + this.rebalanceRequired = rebalanceRequired; + refreshState(); + } + private void updateClusterCounters(String nodeId, NcLocalCounters localCounters) { final IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); resourceIdManager.report(nodeId, localCounters.getMaxResourceId()); -- To view, visit https://asterix-gerrit.ics.uci.edu/3394 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-MessageType: merged Gerrit-Change-Id: I863f8bf3fe1ce8d59522c9a28a1283006ffa414c Gerrit-Change-Number: 3394 Gerrit-PatchSet: 2 Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]>
