Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2121
Change subject: [ASTERIXDB-1954][STO][RT] Add Index Drop Options
......................................................................
[ASTERIXDB-1954][STO][RT] Add Index Drop Options
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add options to allow drop index to ignore
index does not exist and retry on index in-use.
- Retry on interrupts with timeout while waiting
for response in RPCInterface with timeout.
- Add test case for new index drop options.
Change-Id: Id6f8fa52489bbe64d2f48c5c3d0a07be60f30b1b
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
A
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
M hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
M
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
15 files changed, 326 insertions(+), 32 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/21/2121/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index ceaf4cf..9e34d70 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -19,8 +19,10 @@
package org.apache.asterix.utils;
import static org.apache.asterix.app.translator.QueryTranslator.abort;
+import static
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -338,7 +340,8 @@
List<JobSpecification> jobs = new ArrayList<>();
List<Index> indexes =
metadataProvider.getDatasetIndexes(dataset.getDataverseName(),
dataset.getDatasetName());
for (Index index : indexes) {
- jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider,
dataset, true));
+ jobs.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider,
dataset,
+ EnumSet.of(DropOption.IF_EXISTS,
DropOption.WAIT_ON_IN_USE)));
}
for (JobSpecification jobSpec : jobs) {
JobUtils.runJob(hcc, jobSpec, true);
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
new file mode 100644
index 0000000..3be26b4
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/IndexDropOperatorNodePushableTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.storage;
+
+import static
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.test.common.TestHelper;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
+import
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class IndexDropOperatorNodePushableTest {
+
+ private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+ private static final ARecordType RECORD_TYPE = new
ARecordType("TestRecordType", new String[] { "key", "value" },
+ new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+ private static final ARecordType META_TYPE = null;
+ private static final int[] KEY_INDEXES = { 0 };
+ private static final List<Integer> KEY_INDICATORS_LIST = Arrays.asList(new
Integer[] { Index.RECORD_INDICATOR });
+ private static final int DATASET_ID = 101;
+ private static final String DATAVERSE_NAME = "TestDV";
+ private static final String DATASET_NAME = "TestDS";
+ private static final String DATA_TYPE_NAME = "DUMMY";
+ private static final String NODE_GROUP_NAME = "DEFAULT";
+ private final AtomicBoolean dropFailed = new AtomicBoolean(false);
+
+ @Before
+ public void setUp() throws Exception {
+ System.out.println("SetUp: ");
+ TestHelper.deleteExistingInstanceFiles();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ System.out.println("TearDown");
+ TestHelper.deleteExistingInstanceFiles();
+ }
+
+ @Test
+ public void dropOptionsTest() throws Exception {
+ TestNodeController nc = new TestNodeController(null, false);
+ try {
+ nc.init();
+ StorageComponentProvider storageManager = new
StorageComponentProvider();
+ List<List<String>> partitioningKeys = new ArrayList<>();
+ partitioningKeys.add(Collections.singletonList("key"));
+ Dataset dataset =
+ new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME,
DATA_TYPE_NAME, NODE_GROUP_NAME, null,
+ null, new InternalDatasetDetails(null,
InternalDatasetDetails.PartitioningStrategy.HASH,
+ partitioningKeys, null, null, null, false, null,
false), null,
+ DatasetConfig.DatasetType.INTERNAL, DATASET_ID, 0);
+ // create dataset
+ TestNodeController.PrimaryIndexInfo indexInfo =
+ nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE,
META_TYPE, new NoMergePolicyFactory(), null,
+ null, storageManager, KEY_INDEXES,
KEY_INDICATORS_LIST);
+ IndexDataflowHelperFactory helperFactory =
+ new IndexDataflowHelperFactory(nc.getStorageManager(),
indexInfo.getFileSplitProvider());
+ IHyracksTaskContext ctx = nc.createTestContext(true);
+ IIndexDataflowHelper dataflowHelper =
helperFactory.create(ctx.getJobletContext().getServiceContext(), 0);
+ dropInUse(ctx, helperFactory, dataflowHelper);
+ dropInUseWithWait(ctx, helperFactory, dataflowHelper);
+ dropNonExisting(ctx, helperFactory);
+ dropNonExistingWithIfExists(ctx, helperFactory);
+ } finally {
+ nc.deInit();
+ }
+ }
+
+ private void dropInUse(IHyracksTaskContext ctx, IndexDataflowHelperFactory
helperFactory,
+ IIndexDataflowHelper dataflowHelper) throws Exception {
+ dropFailed.set(false);
+ // open the index to make it in-use
+ dataflowHelper.open();
+ // try to drop in-use index (should fail)
+ IndexDropOperatorNodePushable dropInUseOp =
+ new IndexDropOperatorNodePushable(helperFactory,
EnumSet.noneOf(DropOption.class), ctx, 0);
+ try {
+ dropInUseOp.initialize();
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ Assert.assertEquals(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
e.getErrorCode());
+ dropFailed.set(true);
+ }
+ Assert.assertTrue(dropFailed.get());
+ }
+
+ private void dropInUseWithWait(IHyracksTaskContext ctx,
IndexDataflowHelperFactory helperFactory,
+ IIndexDataflowHelper dataflowHelper) throws Exception {
+ dropFailed.set(false);
+ // drop with option wait for in-use should be successful once the
index is closed
+ final IndexDropOperatorNodePushable dropWithWaitOp = new
IndexDropOperatorNodePushable(helperFactory,
+ EnumSet.of(DropOption.IF_EXISTS, DropOption.WAIT_ON_IN_USE),
ctx, 0);
+ Thread dropThread = new Thread(() -> {
+ try {
+ dropWithWaitOp.initialize();
+ } catch (HyracksDataException e) {
+ dropFailed.set(true);
+ e.printStackTrace();
+ }
+ });
+ dropThread.start();
+ // wait for the drop thread to start
+ while (dropThread.getState() == Thread.State.NEW) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ // close the index to allow the drop to complete
+ dataflowHelper.close();
+ dropThread.join();
+ Assert.assertFalse(dropFailed.get());
+ }
+
+ private void dropNonExisting(IHyracksTaskContext ctx,
IndexDataflowHelperFactory helperFactory) throws Exception {
+ dropFailed.set(false);
+ // Dropping non-existing index
+ IndexDropOperatorNodePushable dropNonExistingOp =
+ new IndexDropOperatorNodePushable(helperFactory,
EnumSet.noneOf(DropOption.class), ctx, 0);
+ try {
+ dropNonExistingOp.initialize();
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ Assert.assertEquals(ErrorCode.INDEX_DOES_NOT_EXIST,
e.getErrorCode());
+ dropFailed.set(true);
+ }
+ Assert.assertTrue(dropFailed.get());
+ }
+
+ private void dropNonExistingWithIfExists(IHyracksTaskContext ctx,
IndexDataflowHelperFactory helperFactory)
+ throws Exception {
+ // Dropping non-existing index with if exists option should be
successful
+ dropFailed.set(false);
+ IndexDropOperatorNodePushable dropNonExistingWithIfExistsOp =
+ new IndexDropOperatorNodePushable(helperFactory,
EnumSet.of(DropOption.IF_EXISTS), ctx, 0);
+ try {
+ dropNonExistingWithIfExistsOp.initialize();
+ } catch (HyracksDataException e) {
+ e.printStackTrace();
+ dropFailed.set(true);
+ }
+ Assert.assertFalse(dropFailed.get());
+ }
+}
\ No newline at end of file
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index e79f002..c53af3e 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -37,6 +37,7 @@
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -129,14 +130,19 @@
IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID);
if (dsr == null || iInfo == null) {
- throw new HyracksDataException("Index with resource ID " +
resourceID + " does not exist.");
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
}
PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
if (iInfo.getReferenceCount() != 0 || (opTracker != null &&
opTracker.getNumActiveOperations() != 0)) {
- throw new HyracksDataException("Cannot remove index while it is
open. (Dataset reference count = "
- + iInfo.getReferenceCount() + ", Operation tracker number
of active operations = "
- + opTracker.getNumActiveOperations() + ")");
+ if (LOGGER.isLoggable(Level.SEVERE)) {
+ final String logMsg = String.format(
+ "Failed to drop in-use index %s. Ref count (%d),
Operation tracker active ops (%d)",
+ resourcePath, iInfo.getReferenceCount(),
opTracker.getNumActiveOperations());
+ LOGGER.severe(logMsg);
+ }
+ throw HyracksDataException
+ .create(ErrorCode.CANNOT_DROP_IN_USE_INDEX,
StoragePathUtil.getIndexNameFromPath(resourcePath));
}
// TODO: use fine-grained counters, one for each index instead of a
single counter per dataset.
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 824e30b..0b9b94b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -129,4 +129,15 @@
throw HyracksDataException.create(e);
}
}
+
+ /**
+ * Gets the index name part in the index relative path.
+ *
+ * @param path
+ * @return The index name
+ */
+ public static String getIndexNameFromPath(String path) {
+ int idx = path.lastIndexOf(DATASET_INDEX_NAME_SEPARATOR);
+ return idx != -1 ? path.substring(idx +
DATASET_INDEX_NAME_SEPARATOR.length()) : path;
+ }
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 411f866..cc6923e 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -18,7 +18,11 @@
*/
package org.apache.asterix.metadata.utils;
+import static
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.*;
+
+import java.util.EnumSet;
import java.util.List;
+import java.util.Set;
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.config.OptimizationConfUtil;
@@ -37,6 +41,7 @@
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.job.IJobletEventListenerFactory;
import org.apache.hyracks.api.job.JobSpecification;
+import
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
public class IndexUtil {
@@ -103,14 +108,14 @@
Dataset dataset) throws AlgebricksException {
SecondaryIndexOperationsHelper secondaryIndexHelper =
SecondaryIndexOperationsHelper
.createIndexOperationsHelper(dataset, index, metadataProvider,
physicalOptimizationConfig);
- return secondaryIndexHelper.buildDropJobSpec(false);
+ return
secondaryIndexHelper.buildDropJobSpec(EnumSet.noneOf(DropOption.class));
}
public static JobSpecification buildDropIndexJobSpec(Index index,
MetadataProvider metadataProvider,
- Dataset dataset, boolean failSilently) throws AlgebricksException {
+ Dataset dataset, Set<DropOption> options) throws
AlgebricksException {
SecondaryIndexOperationsHelper secondaryIndexHelper =
SecondaryIndexOperationsHelper
.createIndexOperationsHelper(dataset, index, metadataProvider,
physicalOptimizationConfig);
- return secondaryIndexHelper.buildDropJobSpec(failSilently);
+ return secondaryIndexHelper.buildDropJobSpec(options);
}
public static JobSpecification buildSecondaryIndexCreationJobSpec(Dataset
dataset, Index index,
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index 8fc9ed7..98c47a2 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -19,9 +19,13 @@
package org.apache.asterix.metadata.utils;
+import static
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
+
import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
@@ -170,7 +174,7 @@
public abstract JobSpecification buildCompactJobSpec() throws
AlgebricksException;
- public abstract JobSpecification buildDropJobSpec(boolean failSilently)
throws AlgebricksException;
+ public abstract JobSpecification buildDropJobSpec(Set<DropOption> options)
throws AlgebricksException;
protected void init() throws AlgebricksException {
payloadSerde =
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 2dcab4f..b63ea16 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -19,6 +19,10 @@
package org.apache.asterix.metadata.utils;
+import static
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
+
+import java.util.Set;
+
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -66,15 +70,14 @@
}
@Override
- public JobSpecification buildDropJobSpec(boolean failSilently) throws
AlgebricksException {
+ public JobSpecification buildDropJobSpec(Set<DropOption> options) throws
AlgebricksException {
JobSpecification spec =
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraint =
metadataProvider.getSplitProviderAndConstraints(dataset,
index.getIndexName());
IIndexDataflowHelperFactory dataflowHelperFactory = new
IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(),
splitsAndConstraint.first);
// The index drop operation should be persistent regardless of temp
datasets or permanent dataset.
- IndexDropOperatorDescriptor btreeDrop = new
IndexDropOperatorDescriptor(spec, dataflowHelperFactory,
- failSilently);
+ IndexDropOperatorDescriptor btreeDrop = new
IndexDropOperatorDescriptor(spec, dataflowHelperFactory, options);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
btreeDrop,
splitsAndConstraint.second);
spec.addRoot(btreeDrop);
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index a071345..0fe04b7 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -133,6 +133,8 @@
public static final int ILLEGAL_ATTEMPT_TO_EXIT_EMPTY_COMPONENT = 97;
public static final int A_FLUSH_OPERATION_HAS_FAILED = 98;
public static final int A_MERGE_OPERATION_HAS_FAILED = 99;
+ public static final int INDEX_DOES_NOT_EXIST = 100;
+ public static final int CANNOT_DROP_IN_USE_INDEX = 101;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index ae579b4..70ce1e7 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -116,5 +116,7 @@
97 = Illegal attempt to exit empty component
98 = A flush operation has failed
99 = A merge operation has failed
+100 = Index does not exist
+101 = Cannot drop in-use index (%1$s)
10000 = The given rule collection %1$s is not an instance of the List class.
diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
index b49e99e..71bbcb6f 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/pom.xml
@@ -49,5 +49,10 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
index ba1c9a4..7c3d85f 100644
---
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
+++
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/api/RPCInterface.java
@@ -20,6 +20,7 @@
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.hyracks.ipc.exceptions.IPCException;
@@ -64,6 +65,7 @@
private static class Request {
+ public static final long RESPONSE_TIMEOUT_ON_INTERRUPT =
TimeUnit.SECONDS.toMillis(5);
private boolean pending;
private Object result;
@@ -90,7 +92,16 @@
synchronized Object getResponse() throws Exception {
while (pending) {
- wait();
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // try to wait again with a timeout
+ wait(RESPONSE_TIMEOUT_ON_INTERRUPT);
+ Thread.currentThread().interrupt();
+ if (pending) {
+ throw e;
+ }
+ }
}
if (exception != null) {
throw exception;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
index af843d2..abfe0bb 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java
@@ -23,6 +23,7 @@
import java.util.logging.Logger;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
@@ -72,8 +73,7 @@
// Get local resource
LocalResource lr = getResource();
if (lr == null) {
- throw new HyracksDataException(
- "Index resource couldn't be found. Has it been created
yet? Was it deleted?");
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
}
IResource resource = lr.getResource();
index = resource.createInstance(ctx);
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
index 18c7107..032e758 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorDescriptor.java
@@ -19,6 +19,9 @@
package org.apache.hyracks.storage.am.common.dataflow;
+import java.util.EnumSet;
+import java.util.Set;
+
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -28,25 +31,30 @@
public class IndexDropOperatorDescriptor extends
AbstractSingleActivityOperatorDescriptor {
+ public enum DropOption {
+ IF_EXISTS,
+ WAIT_ON_IN_USE
+ }
+
private static final long serialVersionUID = 1L;
private final IIndexDataflowHelperFactory dataflowHelperFactory;
- private final boolean failSilently;
+ private final Set<DropOption> options;
public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
IIndexDataflowHelperFactory dataflowHelperFactory) {
- this(spec, dataflowHelperFactory, false);
+ this(spec, dataflowHelperFactory, EnumSet.noneOf(DropOption.class));
}
public IndexDropOperatorDescriptor(IOperatorDescriptorRegistry spec,
- IIndexDataflowHelperFactory dataflowHelperFactory, boolean
failSilently) {
+ IIndexDataflowHelperFactory dataflowHelperFactory, Set<DropOption>
options) {
super(spec, 0, 0);
this.dataflowHelperFactory = dataflowHelperFactory;
- this.failSilently = failSilently;
+ this.options = options;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
- return new IndexDropOperatorNodePushable(dataflowHelperFactory,
failSilently, ctx, partition);
+ return new IndexDropOperatorNodePushable(dataflowHelperFactory,
options, ctx, partition);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
index 7c2021b..ea1635a 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDropOperatorNodePushable.java
@@ -19,6 +19,17 @@
package org.apache.hyracks.storage.am.common.dataflow;
+import static
org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_DROP_IN_USE_INDEX;
+import static org.apache.hyracks.api.exceptions.ErrorCode.INDEX_DOES_NOT_EXIST;
+import static
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
+import static
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption.IF_EXISTS;
+import static
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption.WAIT_ON_IN_USE;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -27,17 +38,22 @@
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
public class IndexDropOperatorNodePushable extends
AbstractOperatorNodePushable {
- private final IIndexDataflowHelper indexHelper;
- private final boolean failSliently;
- public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory
indexHelperFactory, boolean failSilently,
+ private static final Logger LOGGER =
Logger.getLogger(IndexDropOperatorNodePushable.class.getName());
+ private static final long DROP_ATTEMPT_WAIT_TIME_MILLIS =
TimeUnit.SECONDS.toMillis(1);
+ private final IIndexDataflowHelper indexHelper;
+ private final Set<DropOption> options;
+ private long maxWaitTimeMillis = TimeUnit.SECONDS.toMillis(30);
+
+ public IndexDropOperatorNodePushable(IIndexDataflowHelperFactory
indexHelperFactory, Set<DropOption> options,
IHyracksTaskContext ctx, int partition) throws
HyracksDataException {
this.indexHelper =
indexHelperFactory.create(ctx.getJobletContext().getServiceContext(),
partition);
- this.failSliently = failSilently;
+ this.options = options;
}
@Override
public void deinitialize() throws HyracksDataException {
+ // no op
}
@Override
@@ -52,16 +68,51 @@
@Override
public void initialize() throws HyracksDataException {
- try {
- indexHelper.destroy();
- } catch (HyracksDataException e) {
- if (!failSliently) {
+ dropIndex();
+ }
+
+ @Override
+ public void setOutputFrameWriter(int index, IFrameWriter writer,
RecordDescriptor recordDesc) {
+ // no op
+ }
+
+ private void dropIndex() throws HyracksDataException {
+ while (true) {
+ try {
+ indexHelper.destroy();
+ return;
+ } catch (HyracksDataException e) {
+ if (isIgnorable(e)) {
+ LOGGER.log(Level.INFO, e, () -> "Ignoring exception on
drop");
+ return;
+ }
+ if (canRetry(e)) {
+ LOGGER.log(Level.INFO, e, () -> "Retrying drop on
exception");
+ continue;
+ }
throw e;
}
}
}
- @Override
- public void setOutputFrameWriter(int index, IFrameWriter writer,
RecordDescriptor recordDesc) {
+ private boolean isIgnorable(HyracksDataException e) {
+ return e.getErrorCode() == INDEX_DOES_NOT_EXIST &&
options.contains(IF_EXISTS);
+ }
+
+ private boolean canRetry(HyracksDataException e) throws
HyracksDataException {
+ if (e.getErrorCode() == CANNOT_DROP_IN_USE_INDEX &&
options.contains(WAIT_ON_IN_USE)) {
+ if (maxWaitTimeMillis <= 0) {
+ return false;
+ }
+ try {
+ TimeUnit.MILLISECONDS.sleep(DROP_ATTEMPT_WAIT_TIME_MILLIS);
+ maxWaitTimeMillis -= DROP_ATTEMPT_WAIT_TIME_MILLIS;
+ return true;
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e1);
+ }
+ }
+ return false;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
index de2f742..fe64e9f 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
import org.apache.hyracks.storage.common.IIndex;
@@ -223,12 +224,12 @@
public void unregister(String resourcePath) throws HyracksDataException {
IndexInfo info = indexInfos.get(resourcePath);
if (info == null) {
- throw new HyracksDataException("Index with resource name " +
resourcePath + " does not exist.");
+ throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
}
if (info.referenceCount != 0) {
indexInfos.put(resourcePath, info);
- throw new HyracksDataException("Cannot remove index while it is
open.");
+ throw
HyracksDataException.create(ErrorCode.CANNOT_DROP_IN_USE_INDEX, resourcePath);
}
if (info.isOpen) {
--
To view, visit https://asterix-gerrit.ics.uci.edu/2121
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Id6f8fa52489bbe64d2f48c5c3d0a07be60f30b1b
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>