>From Michael Blow <[email protected]>: Michael Blow has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17608 )
Change subject: [NO ISSUE][*DB] Provide ability for IDatasetRebalanceCallback to decline a rebalance ...................................................................... [NO ISSUE][*DB] Provide ability for IDatasetRebalanceCallback to decline a rebalance Change-Id: I1b2cdb56d70ce47c7b15061cc5ecc4af36ed8f61 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17608 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Murtadha Al Hubail <[email protected]> --- M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java 5 files changed, 56 insertions(+), 19 deletions(-) Approvals: Murtadha Al Hubail: Looks good to me, approved Michael Blow: Looks good to me, but someone else must approve Jenkins: Verified; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java index e8683c9..c366977 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/IDatasetRebalanceCallback.java @@ -30,7 +30,7 @@ public interface IDatasetRebalanceCallback { /** - * The action to perform before the target dataset is populated. + * The check to perform before the target dataset is populated. * * @param metadataProvider, * the metadata provider. @@ -40,9 +40,13 @@ * the target dataset. * @param hcc, * the hyracks client connection. + * + * @return <code>true</code> if the rebalance of the dataset should proceed, otherwise <code>false</code> to skip. + * If the dataset is skipped, the active metadata transaction context, if any, can be expected to be aborted. + * * @throws HyracksDataException */ - void beforeRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target, + boolean canRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target, IHyracksClientConnection hcc) throws HyracksDataException; /** diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java index 680adbf..7085567 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/rebalance/NoOpDatasetRebalanceCallback.java @@ -33,9 +33,10 @@ } @Override - public void beforeRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target, + public boolean canRebalance(MetadataProvider metadataProvider, Dataset source, Dataset target, IHyracksClientConnection hcc) { // Does nothing. + return true; } @Override diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java index 61562d8..601cd02 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java @@ -20,6 +20,8 @@ import static org.apache.asterix.app.translator.QueryTranslator.abort; import static org.apache.asterix.common.config.DatasetConfig.DatasetType; +import static org.apache.asterix.common.utils.IdentifierUtil.dataset; +import static org.apache.asterix.metadata.utils.DatasetUtil.getFullyQualifiedDisplayName; import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; import java.rmi.RemoteException; @@ -90,13 +92,15 @@ * the metadata provider. * @param hcc, * the reusable hyracks connection. + * @return <code>false</code> if the rebalance was safely skipped * @throws Exception */ - public static void rebalance(DataverseName dataverseName, String datasetName, Set<String> targetNcNames, + public static boolean rebalance(DataverseName dataverseName, String datasetName, Set<String> targetNcNames, MetadataProvider metadataProvider, IHyracksClientConnection hcc, IDatasetRebalanceCallback datasetRebalanceCallback, boolean forceRebalance) throws Exception { Dataset sourceDataset; Dataset targetDataset; + boolean success = true; // Executes the first Metadata transaction. // Generates the rebalance target files. While doing that, hold read locks on the dataset so // that no one can drop the rebalance source dataset. @@ -108,13 +112,13 @@ // If the source dataset doesn't exist, then it's a no-op. if (sourceDataset == null) { - return; + return true; } Set<String> sourceNodes = new HashSet<>(metadataProvider.findNodes(sourceDataset.getNodeGroupName())); if (!forceRebalance && sourceNodes.equals(targetNcNames)) { - return; + return true; } if (!targetNcNames.isEmpty()) { @@ -125,20 +129,25 @@ // The target dataset for rebalance. targetDataset = sourceDataset.getTargetDatasetForRebalance(nodeGroupName); - LOGGER.info("Rebalancing dataset {} from node group {} with nodes {} to node group {} with nodes {}", - sourceDataset.getDatasetName(), sourceDataset.getNodeGroupName(), sourceNodes, - targetDataset.getNodeGroupName(), targetNcNames); + LOGGER.info("Rebalancing {} {} from node group {} with nodes {} to node group {} with nodes {}", + dataset(), getFullyQualifiedDisplayName(sourceDataset), sourceDataset.getNodeGroupName(), + sourceNodes, targetDataset.getNodeGroupName(), targetNcNames); // Rebalances the source dataset into the target dataset. if (sourceDataset.getDatasetType() != DatasetType.EXTERNAL) { - rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback); + success = rebalance(sourceDataset, targetDataset, metadataProvider, hcc, datasetRebalanceCallback); } } else { targetDataset = null; // if this the last NC in the cluster, just drop the dataset purgeDataset(sourceDataset, metadataProvider, hcc); } - // Complete the metadata transaction. - MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + if (success) { + // Complete the metadata transaction. + MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); + } else { + // Abort the metadata transaction, since we failed to rebalance the dataset + MetadataManager.INSTANCE.abortTransaction(mdTxnCtx); + } } catch (Exception e) { abort(e, e, mdTxnCtx); throw e; @@ -146,7 +155,10 @@ if (targetNcNames.isEmpty()) { // Nothing else to do since the dataset was dropped. - return; + return true; + } else if (!success) { + LOGGER.info("Dataset {} rebalance was skipped, see above log for reason", datasetName); + return false; } // Up to this point, since the bulk part of a rebalance operation is done, // the following two operations will retry after interrupt and finally rethrow InterruptedException, @@ -165,6 +177,7 @@ runMetadataTransaction(metadataProvider, () -> dropSourceDataset(sourceDataset, metadataProvider, hcc)); }); LOGGER.info("Dataset {} rebalance completed successfully", datasetName); + return true; } @FunctionalInterface @@ -214,13 +227,16 @@ } // Rebalances from the source to the target. - private static void rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider, + private static boolean rebalance(Dataset source, Dataset target, MetadataProvider metadataProvider, IHyracksClientConnection hcc, IDatasetRebalanceCallback datasetRebalanceCallback) throws Exception { // Drops the target dataset files (if any) to make rebalance idempotent. dropDatasetFiles(target, metadataProvider, hcc); // Performs the specified operation before the target dataset is populated. - datasetRebalanceCallback.beforeRebalance(metadataProvider, source, target, hcc); + if (!datasetRebalanceCallback.canRebalance(metadataProvider, source, target, hcc)) { + // the callback indicates that this rebalance should be skipped; short circuit the remaining steps + return false; + } // Creates the rebalance target. createRebalanceTarget(target, metadataProvider, hcc); @@ -233,6 +249,8 @@ // Performs the specified operation after the target dataset is populated. datasetRebalanceCallback.afterRebalance(metadataProvider, source, target, hcc); + + return true; } // Switches the metadata entity from the source dataset to the target dataset. diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java index a8d027f..a50fd4a 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java @@ -20,6 +20,7 @@ package org.apache.asterix.metadata.entities; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; @@ -42,7 +43,7 @@ this.nodeNames = nodeNames; } - public static NodeGroup createOrdered(String groupName, List<String> nodeNames) { + public static NodeGroup createOrdered(String groupName, Collection<String> nodeNames) { List<String> sortedNodeNames = new ArrayList<>(nodeNames); Collections.sort(sortedNodeNames); return new NodeGroup(groupName, sortedNodeNames); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 6529794..b2e6817 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -21,7 +21,6 @@ import static org.apache.asterix.common.utils.IdentifierUtil.dataset; import java.io.DataOutput; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -613,10 +612,10 @@ appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup); NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroup); if (ng != null) { - nodeGroup = nodeGroup + "_" + UUID.randomUUID().toString(); + nodeGroup = nodeGroup + "_" + UUID.randomUUID(); appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup); } - MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, NodeGroup.createOrdered(nodeGroup, new ArrayList<>(ncNames))); + MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, NodeGroup.createOrdered(nodeGroup, ncNames)); return nodeGroup; } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17608 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: neo Gerrit-Change-Id: I1b2cdb56d70ce47c7b15061cc5ecc4af36ed8f61 Gerrit-Change-Number: 17608 Gerrit-PatchSet: 8 Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Al Hubail <[email protected]> Gerrit-MessageType: merged
