Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-1954][STO][RT] Add Index Drop Options ......................................................................
[ASTERIXDB-1954][STO][RT] Add Index Drop Options - user model changes: no - storage format changes: no - interface changes: no Details: - Add options to allow drop index to ignore index does not exist and retry on index in-use. - Add test case for new index drop options. Change-Id: Id6f8fa52489bbe64d2f48c5c3d0a07be60f30b1b Reviewed-on: https://asterix-gerrit.ics.uci.edu/2121 Reviewed-by: Michael Blow <[email protected]> Integration-Tests: Michael Blow <[email protected]> Tested-by: Michael Blow <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java A asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java M hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java 13 files changed, 309 insertions(+), 31 deletions(-) Approvals: Michael Blow: Looks good to me, approved; Verified; Verified diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java index ceaf4cf..9e34d70 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java @@ -19,8 +19,10 @@ package org.apache.asterix.utils; import static org.apache.asterix.app.translator.QueryTranslator.abort; +import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -338,7 +340,8 @@ List<JobSpecification> jobs = new ArrayList<>(); List<Index> indexes = metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()); for (Index index : indexes) { - jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, true)); + jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset, + EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE))); } for (JobSpecification jobSpec : jobs) { JobUtils.runJob(hcc, jobSpec, true); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java new file mode 100644 index 0000000..2bac49e --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.storage; + +import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.asterix.app.bootstrap.TestNodeController; +import org.apache.asterix.common.config.DatasetConfig; +import org.apache.asterix.file.StorageComponentProvider; +import org.apache.asterix.metadata.entities.Dataset; +import org.apache.asterix.metadata.entities.Index; +import org.apache.asterix.metadata.entities.InternalDatasetDetails; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.BuiltinType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.test.common.TestHelper; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; +import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; +import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorNodePushable; +import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class IndexDropOperatorNodePushableTest { + + private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 }; + private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" }, + new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false); + private static final ARecordType META_TYPE = null; + private static final int[] KEY_INDEXES = { 0 }; + private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new Integer[] { Index.RECORD_INDICATOR }); + private static final int DATASET_ID = 101; + private static final String DATAVERSE_NAME = "TestDV"; + private static final String DATASET_NAME = "TestDS"; + private static final String DATA_TYPE_NAME = "DUMMY"; + private static final String NODE_GROUP_NAME = "DEFAULT"; + private final AtomicBoolean dropFailed = new AtomicBoolean(false); + + @Before + public void setUp() throws Exception { + System.out.println("SetUp: "); + TestHelper.deleteExistingInstanceFiles(); + } + + @After + public void tearDown() throws Exception { + System.out.println("TearDown"); + TestHelper.deleteExistingInstanceFiles(); + } + + @Test + public void dropOptionsTest() throws Exception { + TestNodeController nc = new TestNodeController(null, false); + try { + nc.init(); + StorageComponentProvider storageManager = new StorageComponentProvider(); + List<List<String>> partitioningKeys = new ArrayList<>(); + partitioningKeys.add(Collections.singletonList("key")); + Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME, + NoMergePolicyFactory.NAME, null, + new InternalDatasetDetails(null, InternalDatasetDetails.PartitioningStrategy.HASH, partitioningKeys, + null, null, null, false, null, false), null, DatasetConfig.DatasetType.INTERNAL, DATASET_ID, + 0); + // create dataset + TestNodeController.PrimaryIndexInfo indexInfo = + nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, null, storageManager, KEY_INDEXES, + KEY_INDICATORS_LIST); + IndexDataflowHelperFactory helperFactory = + new IndexDataflowHelperFactory(nc.getStorageManager(), indexInfo.getFileSplitProvider()); + IHyracksTaskContext ctx = nc.createTestContext(true); + IIndexDataflowHelper dataflowHelper = helperFactory.create(ctx.getJobletContext().getServiceContext(), 0); + dropInUse(ctx, helperFactory, dataflowHelper); + dropInUseWithWait(ctx, helperFactory, dataflowHelper); + dropNonExisting(ctx, helperFactory); + dropNonExistingWithIfExists(ctx, helperFactory); + } finally { + nc.deInit(); + } + } + + private void dropInUse(IHyracksTaskContext ctx, IndexDataflowHelperFactory helperFactory, + IIndexDataflowHelper dataflowHelper) throws Exception { + dropFailed.set(false); + // open the index to make it in-use + dataflowHelper.open(); + // try to drop in-use index (should fail) + IndexDropOperatorNodePushable dropInUseOp = + new IndexDropOperatorNodePushable(helperFactory, EnumSet.noneOf(DropOption.class), ctx, 0); + try { + dropInUseOp.initialize(); + } catch (HyracksDataException e) { + e.printStackTrace(); + Assert.assertEquals(ErrorCode.CANNOT_DROP_IN_USE_INDEX, e.getErrorCode()); + dropFailed.set(true); + } + Assert.assertTrue(dropFailed.get()); + } + + private void dropInUseWithWait(IHyracksTaskContext ctx, IndexDataflowHelperFactory helperFactory, + IIndexDataflowHelper dataflowHelper) throws Exception { + dropFailed.set(false); + // drop with option wait for in-use should be successful once the index is closed + final IndexDropOperatorNodePushable dropWithWaitOp = new IndexDropOperatorNodePushable(helperFactory, + EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE), ctx, 0); + Thread dropThread = new Thread(() -> { + try { + dropWithWaitOp.initialize(); + } catch (HyracksDataException e) { + dropFailed.set(true); + e.printStackTrace(); + } + }); + dropThread.start(); + // wait for the drop thread to start + while (dropThread.getState() == Thread.State.NEW) { + TimeUnit.MILLISECONDS.sleep(100); + } + // close the index to allow the drop to complete + dataflowHelper.close(); + dropThread.join(); + Assert.assertFalse(dropFailed.get()); + } + + private void dropNonExisting(IHyracksTaskContext ctx, IndexDataflowHelperFactory helperFactory) throws Exception { + dropFailed.set(false); + // Dropping non-existing index + IndexDropOperatorNodePushable dropNonExistingOp = + new IndexDropOperatorNodePushable(helperFactory, EnumSet.noneOf(DropOption.class), ctx, 0); + try { + dropNonExistingOp.initialize(); + } catch (HyracksDataException e) { + e.printStackTrace(); + Assert.assertEquals(ErrorCode.INDEX_DOES_NOT_EXIST, e.getErrorCode()); + dropFailed.set(true); + } + Assert.assertTrue(dropFailed.get()); + } + + private void dropNonExistingWithIfExists(IHyracksTaskContext ctx, IndexDataflowHelperFactory helperFactory) + throws Exception { + // Dropping non-existing index with if exists option should be successful + dropFailed.set(false); + IndexDropOperatorNodePushable dropNonExistingWithIfExistsOp = + new IndexDropOperatorNodePushable(helperFactory, EnumSet.of(DropOption.IF_EXISTS), ctx, 0); + try { + dropNonExistingWithIfExistsOp.initialize(); + } catch (HyracksDataException e) { + e.printStackTrace(); + dropFailed.set(true); + } + Assert.assertFalse(dropFailed.get()); + } +} \ No newline at end of file diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index e79f002..c53af3e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -37,6 +37,7 @@ import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.transactions.ILogManager; import org.apache.asterix.common.transactions.LogRecord; +import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.common.utils.TransactionUtil; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -129,14 +130,19 @@ IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID); if (dsr == null || iInfo == null) { - throw new HyracksDataException("Index with resource ID " + resourceID + " does not exist."); + throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST); } PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(); if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) { - throw new HyracksDataException("Cannot remove index while it is open. (Dataset reference count = " - + iInfo.getReferenceCount() + ", Operation tracker number of active operations = " - + opTracker.getNumActiveOperations() + ")"); + if (LOGGER.isLoggable(Level.SEVERE)) { + final String logMsg = String.format( + "Failed to drop in-use index %s. Ref count (%d), Operation tracker active ops (%d)", + resourcePath, iInfo.getReferenceCount(), opTracker.getNumActiveOperations()); + LOGGER.severe(logMsg); + } + throw HyracksDataException + .create(ErrorCode.CANNOT_DROP_IN_USE_INDEX, StoragePathUtil.getIndexNameFromPath(resourcePath)); } // TODO: use fine-grained counters, one for each index instead of a single counter per dataset. diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java index 824e30b..0b9b94b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java @@ -129,4 +129,15 @@ throw HyracksDataException.create(e); } } + + /** + * Gets the index name part in the index relative path. + * + * @param path + * @return The index name + */ + public static String getIndexNameFromPath(String path) { + int idx = path.lastIndexOf(DATASET_INDEX_NAME_SEPARATOR); + return idx != -1 ? path.substring(idx + DATASET_INDEX_NAME_SEPARATOR.length()) : path; + } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java index 411f866..cc6923e 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java @@ -18,7 +18,11 @@ */ package org.apache.asterix.metadata.utils; +import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.*; + +import java.util.EnumSet; import java.util.List; +import java.util.Set; import org.apache.asterix.common.config.DatasetConfig; import org.apache.asterix.common.config.OptimizationConfUtil; @@ -37,6 +41,7 @@ import org.apache.hyracks.api.dataflow.value.ITypeTraits; import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor; public class IndexUtil { @@ -103,14 +108,14 @@ Dataset dataset) throws AlgebricksException { SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper .createIndexOperationsHelper(dataset, index, metadataProvider, physicalOptimizationConfig); - return secondaryIndexHelper.buildDropJobSpec(false); + return secondaryIndexHelper.buildDropJobSpec(EnumSet.noneOf(DropOption.class)); } public static JobSpecification buildDropIndexJobSpec(Index index, MetadataProvider metadataProvider, - Dataset dataset, boolean failSilently) throws AlgebricksException { + Dataset dataset, Set<DropOption> options) throws AlgebricksException { SecondaryIndexOperationsHelper secondaryIndexHelper = SecondaryIndexOperationsHelper .createIndexOperationsHelper(dataset, index, metadataProvider, physicalOptimizationConfig); - return secondaryIndexHelper.buildDropJobSpec(failSilently); + return secondaryIndexHelper.buildDropJobSpec(options); } public static JobSpecification buildSecondaryIndexCreationJobSpec(Dataset dataset, Index index, diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java index 8fc9ed7..98c47a2 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java @@ -19,9 +19,13 @@ package org.apache.asterix.metadata.utils; +import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; + import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; @@ -170,7 +174,7 @@ public abstract JobSpecification buildCompactJobSpec() throws AlgebricksException; - public abstract JobSpecification buildDropJobSpec(boolean failSilently) throws AlgebricksException; + public abstract JobSpecification buildDropJobSpec(Set<DropOption> options) throws AlgebricksException; protected void init() throws AlgebricksException { payloadSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java index 2dcab4f..b63ea16 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java @@ -19,6 +19,10 @@ package org.apache.asterix.metadata.utils; +import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; + +import java.util.Set; + import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; @@ -66,15 +70,14 @@ } @Override - public JobSpecification buildDropJobSpec(boolean failSilently) throws AlgebricksException { + public JobSpecification buildDropJobSpec(Set<DropOption> options) throws AlgebricksException { JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadataProvider.getSplitProviderAndConstraints(dataset, index.getIndexName()); IIndexDataflowHelperFactory dataflowHelperFactory = new IndexDataflowHelperFactory( metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first); // The index drop operation should be persistent regardless of temp datasets or permanent dataset. - IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, dataflowHelperFactory, - failSilently); + IndexDropOperatorDescriptor btreeDrop = new IndexDropOperatorDescriptor(spec, dataflowHelperFactory, options); AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeDrop, splitsAndConstraint.second); spec.addRoot(btreeDrop); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index 42da1d7..7926469 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -137,6 +137,8 @@ public static final int PAGE_DOES_NOT_EXIST_IN_FILE = 101; public static final int VBC_ALREADY_OPEN = 102; public static final int VBC_ALREADY_CLOSED = 103; + public static final int INDEX_DOES_NOT_EXIST = 104; + public static final int CANNOT_DROP_IN_USE_INDEX = 105; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index bc83f8c..330cf1f 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -120,5 +120,7 @@ 101 = Page %1$s does not exist in file %2$s 102 = Failed to open virtual buffer cache since it is already open 103 = Failed to close virtual buffer cache since it is already closed +104 = Index does not exist +105 = Cannot drop in-use index (%1$s) 10000 = The given rule collection %1$s is not an instance of the List class. diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java index af843d2..abfe0bb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java @@ -23,6 +23,7 @@ import java.util.logging.Logger; import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; @@ -72,8 +73,7 @@ // Get local resource LocalResource lr = getResource(); if (lr == null) { - throw new HyracksDataException( - "Index resource couldn't be found. Has it been created yet? Was it deleted?"); + throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST); } IResource resource = lr.getResource(); index = resource.createInstance(ctx); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java index 18c7107..032e758 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java @@ -19,6 +19,9 @@ package org.apache.hyracks.storage.am.common.dataflow; +import java.util.EnumSet; +import java.util.Set; + import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; @@ -28,25 +31,30 @@ public class IndexDropOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor { + public enum DropOption { + IF_EXISTS, + WAIT_ON_IN_USE + } + private static final long serialVersionUID = 1L; private final IIndexDataflowHelperFactory dataflowHelperFactory; - private final boolean failSilently; + private final Set<DropOption> options; public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec, IIndexDataflowHelperFactory dataflowHelperFactory) { - this(spec, dataflowHelperFactory, false); + this(spec, dataflowHelperFactory, EnumSet.noneOf(DropOption.class)); } public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec, - IIndexDataflowHelperFactory dataflowHelperFactory, boolean failSilently) { + IIndexDataflowHelperFactory dataflowHelperFactory, Set<DropOption> options) { super(spec, 0, 0); this.dataflowHelperFactory = dataflowHelperFactory; - this.failSilently = failSilently; + this.options = options; } @Override public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx, IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException { - return new IndexDropOperatorNodePushable(dataflowHelperFactory, failSilently, ctx, partition); + return new IndexDropOperatorNodePushable(dataflowHelperFactory, options, ctx, partition); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java index 7c2021b..ea1635a 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java @@ -19,6 +19,17 @@ package org.apache.hyracks.storage.am.common.dataflow; +import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_DROP_IN_USE_INDEX; +import static org.apache.hyracks.api.exceptions.ErrorCode.INDEX_DOES_NOT_EXIST; +import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; +import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption.IF_EXISTS; +import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption.WAIT_ON_IN_USE; + +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; @@ -27,17 +38,22 @@ import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper; public class IndexDropOperatorNodePushable extends AbstractOperatorNodePushable { - private final IIndexDataflowHelper indexHelper; - private final boolean failSliently; - public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, boolean failSilently, + private static final Logger LOGGER = Logger.getLogger(IndexDropOperatorNodePushable.class.getName()); + private static final long DROP_ATTEMPT_WAIT_TIME_MILLIS = TimeUnit.SECONDS.toMillis(1); + private final IIndexDataflowHelper indexHelper; + private final Set<DropOption> options; + private long maxWaitTimeMillis = TimeUnit.SECONDS.toMillis(30); + + public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory indexHelperFactory, Set<DropOption> options, IHyracksTaskContext ctx, int partition) throws HyracksDataException { this.indexHelper = indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partition); - this.failSliently = failSilently; + this.options = options; } @Override public void deinitialize() throws HyracksDataException { + // no op } @Override @@ -52,16 +68,51 @@ @Override public void initialize() throws HyracksDataException { - try { - indexHelper.destroy(); - } catch (HyracksDataException e) { - if (!failSliently) { + dropIndex(); + } + + @Override + public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + // no op + } + + private void dropIndex() throws HyracksDataException { + while (true) { + try { + indexHelper.destroy(); + return; + } catch (HyracksDataException e) { + if (isIgnorable(e)) { + LOGGER.log(Level.INFO, e, () -> "Ignoring exception on drop"); + return; + } + if (canRetry(e)) { + LOGGER.log(Level.INFO, e, () -> "Retrying drop on exception"); + continue; + } throw e; } } } - @Override - public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) { + private boolean isIgnorable(HyracksDataException e) { + return e.getErrorCode() == INDEX_DOES_NOT_EXIST && options.contains(IF_EXISTS); + } + + private boolean canRetry(HyracksDataException e) throws HyracksDataException { + if (e.getErrorCode() == CANNOT_DROP_IN_USE_INDEX && options.contains(WAIT_ON_IN_USE)) { + if (maxWaitTimeMillis <= 0) { + return false; + } + try { + TimeUnit.MILLISECONDS.sleep(DROP_ATTEMPT_WAIT_TIME_MILLIS); + maxWaitTimeMillis -= DROP_ATTEMPT_WAIT_TIME_MILLIS; + return true; + } catch (InterruptedException e1) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e1); + } + } + return false; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java index de2f742..fe64e9f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Map.Entry; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; import org.apache.hyracks.storage.common.IIndex; @@ -223,12 +224,12 @@ public void unregister(String resourcePath) throws HyracksDataException { IndexInfo info = indexInfos.get(resourcePath); if (info == null) { - throw new HyracksDataException("Index with resource name " + resourcePath + " does not exist."); + throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST); } if (info.referenceCount != 0) { indexInfos.put(resourcePath, info); - throw new HyracksDataException("Cannot remove index while it is open."); + throw HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX, resourcePath); } if (info.isOpen) { -- To view, visit https://asterix-gerrit.ics.uci.edu/2121 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Id6f8fa52489bbe64d2f48c5c3d0a07be60f30b1b Gerrit-PatchSet: 4 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
