>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17976 )
Change subject: WIP: Fix concurrent index builder
......................................................................
WIP: Fix concurrent index builder
Change-Id: I4c3b0b982b206ac3eea3653c68e3fecc31145cda
---
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
6 files changed, 64 insertions(+), 36 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/76/17976/1
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 93802c2..43f40eb 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -339,7 +339,8 @@
JobSpecification spec =
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
PartitioningProperties partitioningProperties =
metadataProvider.getPartitioningProperties(dataset);
- FileSplit[] fs =
partitioningProperties.getSplitsProvider().getFileSplits();
+ IFileSplitProvider splitsProvider =
partitioningProperties.getSplitsProvider();
+ FileSplit[] fs = splitsProvider.getFileSplits();
StringBuilder sb = new StringBuilder();
for (FileSplit f : fs) {
sb.append(f).append(" ");
@@ -349,19 +350,37 @@
DatasetUtil.getMergePolicyFactory(dataset,
metadataProvider.getMetadataTxnContext());
// prepare a LocalResourceMetadata which will be stored in NC's local
resource
// repository
- IResourceFactory resourceFactory =
dataset.getResourceFactory(metadataProvider, index, itemType, metaItemType,
- compactionInfo.first, compactionInfo.second);
- IndexBuilderFactory indexBuilderFactory =
- new
IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
- partitioningProperties.getSplitsProvider(),
resourceFactory, true);
- IndexCreateOperatorDescriptor indexCreateOp = new
IndexCreateOperatorDescriptor(spec, indexBuilderFactory,
- partitioningProperties.getComputeStorageMap());
+ int[][] computeStorageMap =
partitioningProperties.getComputeStorageMap();
+ IndexBuilderFactory[][] indexBuilderFactories =
getIndexBuilderFactories(dataset, metadataProvider, index,
+ itemType, metaItemType, splitsProvider, compactionInfo.first,
compactionInfo.second, computeStorageMap);
+ IndexCreateOperatorDescriptor indexCreateOp =
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactories,
computeStorageMap);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
indexCreateOp,
partitioningProperties.getConstraints());
spec.addRoot(indexCreateOp);
return spec;
}
+ public static IndexBuilderFactory[][] getIndexBuilderFactories(Dataset
dataset, MetadataProvider metadataProvider,
+ Index index, ARecordType itemType, ARecordType metaItemType,
IFileSplitProvider fileSplitProvider,
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String>
mergePolicyProperties,
+ int[][] computeStorageMap) throws AlgebricksException {
+ IndexBuilderFactory[][] indexBuilderFactories = new
IndexBuilderFactory[computeStorageMap.length][];
+ for (int i = 0; i < computeStorageMap.length; i++) {
+ int len = computeStorageMap[i].length;
+ indexBuilderFactories[i] = new IndexBuilderFactory[len];
+ for (int k = 0; k < len; k++) {
+ IResourceFactory resourceFactory =
dataset.getResourceFactory(metadataProvider, index, itemType,
+ metaItemType, mergePolicyFactory,
mergePolicyProperties);
+ IndexBuilderFactory indexBuilderFactory =
+ new
IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
+ fileSplitProvider, resourceFactory, true);
+ indexBuilderFactories[i][k] = indexBuilderFactory;
+ }
+ }
+ return indexBuilderFactories;
+ }
+
public static JobSpecification compactDatasetJobSpec(Dataverse dataverse,
String datasetName,
MetadataProvider metadataProvider) throws AlgebricksException {
DataverseName dataverseName = dataverse.getDataverseName();
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
index 3dd84ec..e874663 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SampleOperationsHelper.java
@@ -27,7 +27,6 @@
import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.OptimizationConfUtil;
-import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -81,14 +80,12 @@
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import
org.apache.hyracks.dataflow.std.group.AbstractAggregatorDescriptorFactory;
import
org.apache.hyracks.dataflow.std.group.sort.SortGroupByOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor;
import
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
-import org.apache.hyracks.storage.common.IResourceFactory;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
@@ -158,13 +155,11 @@
@Override
public JobSpecification buildCreationJobSpec() throws AlgebricksException {
JobSpecification spec =
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- IStorageComponentProvider storageComponentProvider =
metadataProvider.getStorageComponentProvider();
- IResourceFactory resourceFactory =
dataset.getResourceFactory(metadataProvider, sampleIdx, itemType, metaType,
- mergePolicyFactory, mergePolicyProperties);
- IIndexBuilderFactory indexBuilderFactory = new
IndexBuilderFactory(storageComponentProvider.getStorageManager(),
- fileSplitProvider, resourceFactory, true);
+ IndexBuilderFactory[][] indexBuilderFactories =
+ DatasetUtil.getIndexBuilderFactories(dataset,
metadataProvider, sampleIdx, itemType, metaType,
+ fileSplitProvider, mergePolicyFactory,
mergePolicyProperties, computeStorageMap);
IndexCreateOperatorDescriptor indexCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory,
computeStorageMap);
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactories,
computeStorageMap);
indexCreateOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
indexCreateOp, partitionConstraint);
spec.addRoot(indexCreateOp);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 0cda625..62352bf 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -24,7 +24,6 @@
import java.util.Set;
import org.apache.asterix.common.cluster.PartitioningProperties;
-import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
@@ -34,14 +33,12 @@
import
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor;
import
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
import
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
-import org.apache.hyracks.storage.common.IResourceFactory;
public abstract class SecondaryTreeIndexOperationsHelper extends
SecondaryIndexOperationsHelper {
@@ -53,14 +50,13 @@
@Override
public JobSpecification buildCreationJobSpec() throws AlgebricksException {
JobSpecification spec =
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- IStorageComponentProvider storageComponentProvider =
metadataProvider.getStorageComponentProvider();
- IResourceFactory resourceFactory =
dataset.getResourceFactory(metadataProvider, index, itemType, metaType,
- mergePolicyFactory, mergePolicyProperties);
- IIndexBuilderFactory indexBuilderFactory = new
IndexBuilderFactory(storageComponentProvider.getStorageManager(),
- secondaryFileSplitProvider, resourceFactory, true);
PartitioningProperties partitioningProperties =
metadataProvider.getPartitioningProperties(dataset);
- IndexCreateOperatorDescriptor secondaryIndexCreateOp = new
IndexCreateOperatorDescriptor(spec,
- indexBuilderFactory,
partitioningProperties.getComputeStorageMap());
+ int[][] computeStorageMap =
partitioningProperties.getComputeStorageMap();
+ IndexBuilderFactory[][] indexBuilderFactories =
+ DatasetUtil.getIndexBuilderFactories(dataset,
metadataProvider, index, itemType, metaType,
+ secondaryFileSplitProvider, mergePolicyFactory,
mergePolicyProperties, computeStorageMap);
+ IndexCreateOperatorDescriptor secondaryIndexCreateOp =
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactories,
computeStorageMap);
secondaryIndexCreateOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
secondaryIndexCreateOp,
secondaryPartitionConstraint);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
index d90a212..4948a66 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/btree/AbstractBTreeOperatorTest.java
@@ -120,8 +120,10 @@
IResourceFactory primaryResourceFactory =
createPrimaryResourceFactory();
IIndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager, primarySplitProvider,
primaryResourceFactory, false);
+ IIndexBuilderFactory[][] indexBuilderFactories = new
IIndexBuilderFactory[1][1];
+ indexBuilderFactories[0][0] = indexBuilderFactory;
IndexCreateOperatorDescriptor primaryCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory,
TestUtil.getPartitionsMap(1));
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactories,
TestUtil.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
primaryCreateOp, NC1_ID);
spec.addRoot(primaryCreateOp);
runTest(spec);
@@ -168,8 +170,10 @@
IResourceFactory secondaryResourceFactory =
createSecondaryResourceFactory();
IIndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager,
secondarySplitProvider, secondaryResourceFactory, false);
+ IIndexBuilderFactory[][] indexBuilderFactories = new
IIndexBuilderFactory[1][1];
+ indexBuilderFactories[0][0] = indexBuilderFactory;
IndexCreateOperatorDescriptor secondaryCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory,
TestUtil.getPartitionsMap(1));
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactories,
TestUtil.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
secondaryCreateOp, NC1_ID);
spec.addRoot(secondaryCreateOp);
runTest(spec);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
index 1634b2d..a8f2ef1 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/AbstractRTreeOperatorTest.java
@@ -242,8 +242,10 @@
pageManagerFactory, null, null);
IIndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager, primarySplitProvider,
btreeFactory, false);
+ IIndexBuilderFactory[][] indexBuilderFactories = new
IIndexBuilderFactory[1][1];
+ indexBuilderFactories[0][0] = indexBuilderFactory;
IndexCreateOperatorDescriptor primaryCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory,
TestUtil.getPartitionsMap(1));
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactories,
TestUtil.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
primaryCreateOp, NC1_ID);
spec.addRoot(primaryCreateOp);
runTest(spec);
@@ -305,8 +307,10 @@
JobSpecification spec = new JobSpecification();
IndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(storageManager,
secondarySplitProvider, rtreeFactory, false);
+ IIndexBuilderFactory[][] indexBuilderFactories = new
IIndexBuilderFactory[1][1];
+ indexBuilderFactories[0][0] = indexBuilderFactory;
IndexCreateOperatorDescriptor secondaryCreateOp =
- new IndexCreateOperatorDescriptor(spec, indexBuilderFactory,
TestUtil.getPartitionsMap(1));
+ new IndexCreateOperatorDescriptor(spec, indexBuilderFactories,
TestUtil.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
secondaryCreateOp, NC1_ID);
spec.addRoot(secondaryCreateOp);
runTest(spec);
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
index 61b600a..32636e8 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexCreateOperatorDescriptor.java
@@ -31,13 +31,13 @@
public class IndexCreateOperatorDescriptor extends
AbstractSingleActivityOperatorDescriptor {
private static final long serialVersionUID = 2L;
- private final IIndexBuilderFactory indexBuilderFactory;
+ private final IIndexBuilderFactory[][] indexBuilderFactories;
private final int[][] partitionsMap;
- public IndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec,
IIndexBuilderFactory indexBuilderFactory,
- int[][] partitionsMap) {
+ public IndexCreateOperatorDescriptor(IOperatorDescriptorRegistry spec,
+ IIndexBuilderFactory[][] indexBuilderFactories, int[][]
partitionsMap) {
super(spec, 0, 0);
- this.indexBuilderFactory = indexBuilderFactory;
+ this.indexBuilderFactories = indexBuilderFactories;
this.partitionsMap = partitionsMap;
}
@@ -45,9 +45,10 @@
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
int[] storagePartitions = partitionsMap[partition];
+ IIndexBuilderFactory[] partitionIndexBuilderFactories =
indexBuilderFactories[partition];
IIndexBuilder[] indexBuilders = new
IIndexBuilder[storagePartitions.length];
for (int i = 0; i < storagePartitions.length; i++) {
- indexBuilders[i] = indexBuilderFactory.create(ctx,
storagePartitions[i]);
+ indexBuilders[i] = partitionIndexBuilderFactories[i].create(ctx,
storagePartitions[i]);
}
return new IndexCreateOperatorNodePushable(indexBuilders);
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17976
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I4c3b0b982b206ac3eea3653c68e3fecc31145cda
Gerrit-Change-Number: 17976
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-MessageType: newchange