Michael Blow has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/3394
Change subject: [NO ISSUE][*DB][CLUS] Add support for a rebalance required
cluster state
......................................................................
[NO ISSUE][*DB][CLUS] Add support for a rebalance required cluster state
Allow extensions to mandate that a rebalance is required in order for
the cluster to go active
Change-Id: I863f8bf3fe1ce8d59522c9a28a1283006ffa414c
---
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
3 files changed, 55 insertions(+), 22 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/94/3394/1
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
index c2d3303..e13756c 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IClusterManagementWork.java
@@ -30,8 +30,8 @@
PENDING, // the metadata node has not yet joined & initialized
RECOVERING, // global recovery has not yet completed
ACTIVE, // cluster is ACTIVE and ready for requests
- REBALANCING, // replication is processing failbacks
- SHUTTING_DOWN // a shutdown request has been received, and is underway
+ SHUTTING_DOWN, // a shutdown request has been received, and is underway
+ REBALANCE_REQUIRED // one or more datasets require rebalance before
the cluster is usable
}
WorkType getClusterManagementWorkType();
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 0e62851..6c39372 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -107,6 +108,14 @@
* @return true if the desired state was reached before timeout occurred
*/
boolean waitForState(ClusterState waitForState, long timeout, TimeUnit
unit)
+ throws HyracksDataException, InterruptedException;
+
+ /**
+ * Blocks until the cluster state matches supplied predicate, or timeout
is exhausted.
+ *
+ * @return the cluster state matching the predicate if it was satisfied
before timeout occurred, otherwise null
+ */
+ ClusterState waitForState(Predicate<ClusterState> condition, long timeout,
TimeUnit unit)
throws HyracksDataException, InterruptedException;
/**
@@ -250,4 +259,10 @@
* @return The metadata cluster partitions
*/
ClusterPartition getMetadataPartition();
+
+ /**
+ * Indicate whether one or more datasets must be rebalanced before the
cluster becomes ACTIVE
+ * @param rebalanceRequired
+ */
+ void setRebalanceRequired(boolean rebalanceRequired) throws
HyracksDataException;
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 16a479e..7933cd2 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -28,6 +28,7 @@
import java.util.SortedMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
@@ -75,6 +76,7 @@
private INcLifecycleCoordinator lifecycleCoordinator;
private ICcApplicationContext appCtx;
private ClusterPartition metadataPartition;
+ private boolean rebalanceRequired;
@Override
public void setCcAppCtx(ICcApplicationContext appCtx) {
@@ -186,45 +188,55 @@
return;
}
// the metadata bootstrap & global recovery must be complete before
the cluster can be active
- if (metadataNodeActive) {
- if (state != ClusterState.ACTIVE && state !=
ClusterState.RECOVERING) {
- setState(ClusterState.PENDING);
- }
- appCtx.getMetadataBootstrap().init();
-
- if (appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) {
- setState(ClusterState.ACTIVE);
- } else {
- // start global recovery
- setState(ClusterState.RECOVERING);
- appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
- }
- } else {
+ if (!metadataNodeActive) {
+ setState(ClusterState.PENDING);
+ return;
+ }
+ if (state != ClusterState.ACTIVE && state != ClusterState.RECOVERING) {
setState(ClusterState.PENDING);
}
+ appCtx.getMetadataBootstrap().init();
+
+ if (!appCtx.getGlobalRecoveryManager().isRecoveryCompleted()) {
+ // start global recovery
+ setState(ClusterState.RECOVERING);
+ appCtx.getGlobalRecoveryManager().startGlobalRecovery(appCtx);
+ return;
+ }
+ if (rebalanceRequired) {
+ setState(ClusterState.REBALANCE_REQUIRED);
+ return;
+ }
+ // finally- life is good, set the state to ACTIVE
+ setState(ClusterState.ACTIVE);
}
@Override
- public synchronized void waitForState(ClusterState waitForState) throws
HyracksDataException, InterruptedException {
+ public synchronized void waitForState(ClusterState waitForState) throws
InterruptedException {
while (state != waitForState) {
wait();
}
}
@Override
- public synchronized boolean waitForState(ClusterState waitForState, long
timeout, TimeUnit unit)
- throws HyracksDataException, InterruptedException {
+ public boolean waitForState(ClusterState waitForState, long timeout,
TimeUnit unit) throws InterruptedException {
+ return waitForState(waitForState::equals, timeout, unit) != null;
+ }
+
+ @Override
+ public synchronized ClusterState waitForState(Predicate<ClusterState>
predicate, long timeout, TimeUnit unit)
+ throws InterruptedException {
final long startMillis = System.currentTimeMillis();
final long endMillis = startMillis + unit.toMillis(timeout);
- while (state != waitForState) {
+ while (!predicate.test(state)) {
long millisToSleep = endMillis - System.currentTimeMillis();
if (millisToSleep > 0) {
wait(millisToSleep);
} else {
- return false;
+ return null;
}
}
- return true;
+ return state;
}
@Override
@@ -458,6 +470,12 @@
return metadataPartition;
}
+ @Override
+ public synchronized void setRebalanceRequired(boolean rebalanceRequired)
throws HyracksDataException {
+ this.rebalanceRequired = rebalanceRequired;
+ refreshState();
+ }
+
private void updateClusterCounters(String nodeId, NcLocalCounters
localCounters) {
final IResourceIdManager resourceIdManager =
appCtx.getResourceIdManager();
resourceIdManager.report(nodeId, localCounters.getMaxResourceId());
--
To view, visit https://asterix-gerrit.ics.uci.edu/3394
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: I863f8bf3fe1ce8d59522c9a28a1283006ffa414c
Gerrit-Change-Number: 3394
Gerrit-PatchSet: 1
Gerrit-Owner: Michael Blow <[email protected]>