Yingyi Bu has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1821
Change subject: WIP: Make rebalance idempotent.
......................................................................
WIP: Make rebalance idempotent.
Change-Id: I0d14a07978e106cd497cc35538fafef318b2fcf7
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
8 files changed, 47 insertions(+), 17 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/21/1821/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index dae88c2..3ddc929 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -145,6 +145,9 @@
// Rebalances from the source to the target.
private static void rebalance(Dataset source, Dataset target,
MetadataProvider metadataProvider,
IHyracksClientConnection hcc) throws Exception {
+ // Drops the target dataset files (if any) to make rebalance
idempotent.
+ dropDatasetFiles(target, metadataProvider, hcc);
+
// Creates the rebalance target.
createRebalanceTarget(target, metadataProvider, hcc);
@@ -164,6 +167,7 @@
Dataset sourceDataset = MetadataManagerUtil.findDataset(mdTxnCtx,
source.getDataverseName(),
source.getDatasetName());
+
if (sourceDataset == null) {
// The dataset has already been dropped.
// In this case, we should drop the generated target dataset files.
@@ -171,11 +175,11 @@
return;
}
- // Drops the source dataset files.
- dropDatasetFiles(source, metadataProvider, hcc);
-
// Updates the dataset entry in the metadata storage
MetadataManager.INSTANCE.updateDataset(mdTxnCtx, target);
+
+ // Drops the source dataset files.
+ dropDatasetFiles(source, metadataProvider, hcc);
// Drops the metadata entry of source dataset's node group.
String sourceNodeGroup = source.getNodeGroupName();
@@ -259,7 +263,7 @@
List<JobSpecification> jobs = new ArrayList<>();
List<Index> indexes =
metadataProvider.getDatasetIndexes(dataset.getDataverseName(),
dataset.getDatasetName());
for (Index index : indexes) {
- jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider,
dataset));
+ jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider,
dataset, true));
}
for (JobSpecification jobSpec : jobs) {
JobUtils.runJob(hcc, jobSpec, true);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index ca5ddd0..6a53980 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -101,10 +101,17 @@
public static JobSpecification buildDropIndexJobSpec(Index index,
MetadataProvider metadataProvider,
Dataset dataset) throws AlgebricksException {
+ SecondaryIndexOperationsHelper secondaryIndexHelper =
SecondaryIndexOperationsHelper
+ .createIndexOperationsHelper(dataset, index, metadataProvider,
physicalOptimizationConfig);
+ return secondaryIndexHelper.buildDropJobSpec(false);
+ }
+
+ public static JobSpecification buildDropIndexJobSpec(Index index,
MetadataProvider metadataProvider,
+ Dataset dataset, boolean failSilently) throws AlgebricksException {
SecondaryIndexOperationsHelper secondaryIndexHelper =
SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index,
metadataProvider,
physicalOptimizationConfig);
- return secondaryIndexHelper.buildDropJobSpec();
+ return secondaryIndexHelper.buildDropJobSpec(failSilently);
}
public static JobSpecification buildSecondaryIndexCreationJobSpec(Dataset
dataset, Index index,
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 21ec8de..a7db962 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -170,7 +170,7 @@
public abstract JobSpecification buildCompactJobSpec() throws
AlgebricksException;
- public abstract JobSpecification buildDropJobSpec() throws
AlgebricksException;
+ public abstract JobSpecification buildDropJobSpec(boolean failSilently)
throws AlgebricksException;
protected void init() throws AlgebricksException {
payloadSerde =
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 907192c..2dcab4f 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -66,14 +66,15 @@
}
@Override
- public JobSpecification buildDropJobSpec() throws AlgebricksException {
+ public JobSpecification buildDropJobSpec(boolean failSilently) throws
AlgebricksException {
JobSpecification spec =
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraint =
metadataProvider.getSplitProviderAndConstraints(dataset,
index.getIndexName());
IIndexDataflowHelperFactory dataflowHelperFactory = new
IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(),
splitsAndConstraint.first);
// The index drop operation should be persistent regardless of temp
datasets or permanent dataset.
- IndexDropOperatorDescriptor btreeDrop = new
IndexDropOperatorDescriptor(spec, dataflowHelperFactory);
+ IndexDropOperatorDescriptor btreeDrop = new
IndexDropOperatorDescriptor(spec, dataflowHelperFactory,
+ failSilently);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
btreeDrop,
splitsAndConstraint.second);
spec.addRoot(btreeDrop);
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
index be10b27..40adc80 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IIndexDataflowHelper.java
@@ -41,9 +41,12 @@
/**
* Removes the index permanently.
*
+ * @param failSilently,
+ * whether to fail silently or throw exception.
+ *
* @throws HyracksDataException
*/
- void destroy() throws HyracksDataException;
+ void destroy(boolean failSilently) throws HyracksDataException;
/**
* @return the index instance
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index 1e959e1..17b7dd2 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -57,17 +57,20 @@
synchronized (lcManager) {
index = lcManager.get(resourceRef.getRelativePath());
if (index == null) {
- LocalResource lr = readIndex();
+ LocalResource lr = readIndex(true);
lcManager.register(lr.getPath(), index);
}
lcManager.open(resourceRef.getRelativePath());
}
}
- private LocalResource readIndex() throws HyracksDataException {
+ private LocalResource readIndex(boolean failSilently) throws
HyracksDataException {
// Get local resource
LocalResource lr = getResource();
if (lr == null) {
+ if (failSilently) {
+ return null;
+ }
throw new HyracksDataException(
"Index resource couldn't be found. Has it been created
yet? Was it deleted?");
}
@@ -84,13 +87,16 @@
}
@Override
- public void destroy() throws HyracksDataException {
+ public void destroy(boolean failSilently) throws HyracksDataException {
synchronized (lcManager) {
index = lcManager.get(resourceRef.getRelativePath());
if (index != null) {
lcManager.unregister(resourceRef.getRelativePath());
} else {
- readIndex();
+ LocalResource resource = readIndex(failSilently);
+ if (resource == null) {
+ return;
+ }
}
if (getResourceId() != -1) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
index d162be0..18c7107 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
@@ -30,16 +30,23 @@
private static final long serialVersionUID = 1L;
private final IIndexDataflowHelperFactory dataflowHelperFactory;
+ private final boolean failSilently;
public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
IIndexDataflowHelperFactory dataflowHelperFactory) {
+ this(spec, dataflowHelperFactory, false);
+ }
+
+ public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ IIndexDataflowHelperFactory dataflowHelperFactory, boolean
failSilently) {
super(spec, 0, 0);
this.dataflowHelperFactory = dataflowHelperFactory;
+ this.failSilently = failSilently;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
- return new IndexDropOperatorNodePushable(dataflowHelperFactory, ctx,
partition);
+ return new IndexDropOperatorNodePushable(dataflowHelperFactory,
failSilently, ctx, partition);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
index f6073a4..e7be2c0 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
@@ -28,10 +28,12 @@
public class IndexDropOperatorNodePushable extends
AbstractOperatorNodePushable {
private final IIndexDataflowHelper indexHelper;
+ private final boolean failSliently;
- public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory
indexHelperFactory, IHyracksTaskContext ctx,
- int partition) throws HyracksDataException {
+ public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory
indexHelperFactory, boolean failSilently,
+ IHyracksTaskContext ctx, int partition) throws
HyracksDataException {
this.indexHelper = indexHelperFactory.create(ctx, partition);
+ this.failSliently = failSilently;
}
@Override
@@ -50,7 +52,7 @@
@Override
public void initialize() throws HyracksDataException {
- indexHelper.destroy();
+ indexHelper.destroy(failSliently);
}
@Override
--
To view, visit https://asterix-gerrit.ics.uci.edu/1821
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I0d14a07978e106cd497cc35538fafef318b2fcf7
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>