Michael Blow has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/3394


Change subject: [NO ISSUE][*DB][CLUS] Add support for a rebalance required 
cluster state
......................................................................

[NO ISSUE][*DB][CLUS] Add support for a rebalance required cluster state

Allow extensions to mandate that a rebalance is required in order for
the cluster to go active

Change-Id: I863f8bf3fe1ce8d59522c9a28a1283006ffa414c
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
3 files changed, 55 insertions(+), 22 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/94/3394/1

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index c2d3303..e13756c 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -30,8 +30,8 @@
         PENDING, // the metadata node has not yet joined & initialized
         RECOVERING, // global recovery has not yet completed
         ACTIVE, // cluster is ACTIVE and ready for requests
-        REBALANCING, // replication is processing failbacks
-        SHUTTING_DOWN // a shutdown request has been received, and is underway
+        SHUTTING_DOWN, // a shutdown request has been received, and is underway
+        REBALANCE_REQUIRED // one or more datasets require rebalance before 
the cluster is usable
     }

     WorkType getClusterManagementWorkType();
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 0e62851..6c39372 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -21,6 +21,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;

 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -107,6 +108,14 @@
      * @return true if the desired state was reached before timeout occurred
      */
     boolean waitForState(ClusterState waitForState, long timeout, TimeUnit 
unit)
+            throws HyracksDataException, InterruptedException;
+
+    /**
+     * Blocks until the cluster state matches supplied predicate, or timeout 
is exhausted.
+     *
+     * @return the cluster state matching the predicate if it was satisfied 
before timeout occurred, otherwise null
+     */
+    ClusterState waitForState(Predicate<ClusterState> condition, long timeout, 
TimeUnit unit)
             throws HyracksDataException, InterruptedException;

     /**
@@ -250,4 +259,10 @@
      * @return The metadata cluster partitions
      */
     ClusterPartition getMetadataPartition();
+
+    /**
+     * Indicate whether one or more datasets must be rebalanced before the 
cluster becomes ACTIVE
+     * @param rebalanceRequired
+     */
+    void setRebalanceRequired(boolean rebalanceRequired) throws 
HyracksDataException;
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 16a479e..7933cd2 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -28,6 +28,7 @@
 import java.util.SortedMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;

 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
@@ -75,6 +76,7 @@
     private INcLifecycleCoordinator lifecycleCoordinator;
     private ICcApplicationContext appCtx;
     private ClusterPartition metadataPartition;
+    private boolean rebalanceRequired;

     @Override
     public void setCcAppCtx(ICcApplicationContext appCtx) {
@@ -186,45 +188,55 @@
             return;
         }
         // the metadata bootstrap & global recovery must be complete before 
the cluster can be active
-        if (metadataNodeActive) {
-            if (state != ClusterState.ACTIVE && state != 
ClusterState.RECOVERING) {
-                setState(ClusterState.PENDING);
-            }
-            appCtx.getMetadataBootstrap().init();
-
-            if (appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) {
-                setState(ClusterState.ACTIVE);
-            } else {
-                // start global recovery
-                setState(ClusterState.RECOVERING);
-                appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
-            }
-        } else {
+        if (!metadataNodeActive) {
+            setState(ClusterState.PENDING);
+            return;
+        }
+        if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING) {
             setState(ClusterState.PENDING);
         }
+        appCtx.getMetadataBootstrap().init();
+
+        if (!appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) {
+            // start global recovery
+            setState(ClusterState.RECOVERING);
+            appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
+            return;
+        }
+        if (rebalanceRequired) {
+            setState(ClusterState.REBALANCE_REQUIRED);
+            return;
+        }
+        // finally- life is good, set the state to ACTIVE
+        setState(ClusterState.ACTIVE);
     }

     @Override
-    public synchronized void waitForState(ClusterState waitForState) throws 
HyracksDataException, InterruptedException {
+    public synchronized void waitForState(ClusterState waitForState) throws 
InterruptedException {
         while (state != waitForState) {
             wait();
         }
     }

     @Override
-    public synchronized boolean waitForState(ClusterState waitForState, long 
timeout, TimeUnit unit)
-            throws HyracksDataException, InterruptedException {
+    public boolean waitForState(ClusterState waitForState, long timeout, 
TimeUnit unit) throws InterruptedException {
+        return waitForState(waitForState::equals, timeout, unit) != null;
+    }
+
+    @Override
+    public synchronized ClusterState waitForState(Predicate<ClusterState> 
predicate, long timeout, TimeUnit unit)
+            throws InterruptedException {
         final long startMillis = System.currentTimeMillis();
         final long endMillis = startMillis + unit.toMillis(timeout);
-        while (state != waitForState) {
+        while (!predicate.test(state)) {
             long millisToSleep = endMillis - System.currentTimeMillis();
             if (millisToSleep > 0) {
                 wait(millisToSleep);
             } else {
-                return false;
+                return null;
             }
         }
-        return true;
+        return state;
     }

     @Override
@@ -458,6 +470,12 @@
         return metadataPartition;
     }

+    @Override
+    public synchronized void setRebalanceRequired(boolean rebalanceRequired) 
throws HyracksDataException {
+        this.rebalanceRequired = rebalanceRequired;
+        refreshState();
+    }
+
     private void updateClusterCounters(String nodeId, NcLocalCounters 
localCounters) {
         final IResourceIdManager resourceIdManager = 
appCtx.getResourceIdManager();
         resourceIdManager.report(nodeId, localCounters.getMaxResourceId());

--
To view, visit https://asterix-gerrit.ics.uci.edu/3394
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I863f8bf3fe1ce8d59522c9a28a1283006ffa414c
Gerrit-Change-Number: 3394
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Blow <[email protected]>

Reply via email to