Yingyi Bu has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1768
Change subject: WIP: Add a dataset relocation REST API.
......................................................................
WIP: Add a dataset relocation REST API.
Change-Id: Ibda35252031fc4940972f0f19bbf796cadfa53d6
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
A
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
A
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceTest.java
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.1.ddl.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.2.update.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.3.query.sqlpp
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.4.post.http
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.5.get.http
A
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.6.query.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml
A
asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm
A
asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm
A
asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm
A
asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
M
asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryHashFunctionFactoryProvider.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/DatasetIdFactory.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/constraints/PartitionConstraintHelper.java
41 files changed, 1,023 insertions(+), 306 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/68/1768/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index 20c69c4..2dd4c3d 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -88,15 +88,8 @@
int[] primaryKeyFields =
JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]);
//get dataset splits
- FileSplit[] splitsForDataset =
metadataProvider.splitsForDataset(metadataProvider.getMetadataTxnContext(),
- dataset.getDataverseName(), dataset.getDatasetName(),
dataset.getDatasetName(),
- metadataProvider.isTemporaryDatasetWriteJob());
- int[] datasetPartitions = new int[splitsForDataset.length];
- for (int i = 0; i < splitsForDataset.length; i++) {
- datasetPartitions[i] = i;
- }
- IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(jobId,
primaryKeyFields, metadataProvider,
- datasetPartitions, isSink);
+ IPushRuntimeFactory runtime =
dataset.getCommitRuntimeFactory(metadataProvider, jobId, primaryKeyFields,
+ isSink);
builder.contributeMicroOperator(op, runtime, recDesc);
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, op, 0);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index 421ee0e..396665e 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -105,8 +105,7 @@
return;
}
boolean temp = dataset.getDatasetDetails().isTemp();
- FileSplit[] fileSplits =
- metadataProvider.splitsForDataset(mdTxnCtx,
dataverseName, datasetName, datasetName, temp);
+ FileSplit[] fileSplits =
metadataProvider.splitsForIndex(mdTxnCtx, dataset, datasetName);
ARecordType recordType = (ARecordType)
metadataProvider.findType(dataset.getItemTypeDataverseName(),
dataset.getItemTypeName());
List<List<String>> primaryKeys = dataset.getPrimaryKeys();
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
new file mode 100644
index 0000000..56a40bc
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -0,0 +1,101 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.asterix.api.http.server;
+
+import static
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.utils.RebalanceUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class RebalanceApiServlet extends AbstractServlet {
+ private static final Logger LOGGER =
Logger.getLogger(RebalanceApiServlet.class.getName());
+ private ICcApplicationContext appCtx;
+
+ public RebalanceApiServlet(ConcurrentMap<String, Object> ctx, String[]
paths, ICcApplicationContext appCtx) {
+ super(ctx, paths);
+ this.appCtx = appCtx;
+ }
+
+ @Override
+ protected void post(IServletRequest request, IServletResponse response) {
+ response.setStatus(HttpResponseStatus.OK);
+ try {
+ HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML,
HttpUtil.Encoding.UTF8);
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, "Failure setting content type", e);
+ response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+ return;
+ }
+ PrintWriter out = response.writer();
+ ObjectMapper om = new ObjectMapper();
+ ObjectNode jsonResponse = om.createObjectNode();
+ try {
+ // Gets dataverse, dataset, and target nodes for rebalance.
+ String dataverseName = request.getParameter("dataverseName");
+ String datasetName = request.getParameter("datasetName");
+ if (dataverseName == null || datasetName == null) {
+ jsonResponse.put("error", "Parameter dataverseName or
datasetName is null,");
+ sendResult(out, jsonResponse);
+ return;
+ }
+ String[] nodes = StringUtils.strip(request.getParameter("nodes"),
"\"'").split(",");
+
+ // Rebalances a given dataset from its current locations to the
target nodes.
+ IHyracksClientConnection hcc = (IHyracksClientConnection)
ctx.get(HYRACKS_CONNECTION_ATTR);
+ MetadataProvider metadataProvider = new MetadataProvider(appCtx,
null, new StorageComponentProvider());
+ RebalanceUtil.rebalance(dataverseName, datasetName,
Arrays.asList(nodes), metadataProvider, hcc);
+
+ // Writes file splits.
+ jsonResponse.put("results", "successful");
+ sendResult(out, jsonResponse);
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Failure handling a request", e);
+ jsonResponse.put("results", e.getMessage());
+ sendResult(out, jsonResponse);
+ e.printStackTrace(out);
+ }
+ }
+
+ private void sendResult(PrintWriter out, ObjectNode jsonResponse) {
+ out.write(jsonResponse.toString());
+ out.flush();
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 6a2b4e0..a5c4a90 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -612,9 +612,7 @@
MetadataUtil.PENDING_ADD_OP);
MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(),
dataset);
if (dd.getDatasetType() == DatasetType.INTERNAL) {
- Dataverse dataverse =
-
MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
dataverseName);
- JobSpecification jobSpec =
DatasetUtil.createDatasetJobSpec(dataverse, datasetName, metadataProvider);
+ JobSpecification jobSpec =
DatasetUtil.createDatasetJobSpec(dataset, metadataProvider);
// #. make metadataTxn commit before calling runJob.
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2429,7 +2427,6 @@
}
protected void handleCreateNodeGroupStatement(MetadataProvider
metadataProvider, Statement stmt) throws Exception {
-
NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 57cc340..61d93f5 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -46,6 +46,7 @@
import org.apache.asterix.api.http.server.QueryServiceServlet;
import org.apache.asterix.api.http.server.QueryStatusApiServlet;
import org.apache.asterix.api.http.server.QueryWebInterfaceServlet;
+import org.apache.asterix.api.http.server.RebalanceApiServlet;
import org.apache.asterix.api.http.server.ShutdownApiServlet;
import org.apache.asterix.api.http.server.UpdateApiServlet;
import org.apache.asterix.api.http.server.VersionApiServlet;
@@ -226,6 +227,7 @@
addServlet(jsonAPIServer, Servlets.SHUTDOWN);
addServlet(jsonAPIServer, Servlets.VERSION);
addServlet(jsonAPIServer, Servlets.CLUSTER_STATE);
+ addServlet(jsonAPIServer, Servlets.REBALANCE);
addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_NODE_DETAIL); // must
not precede add of CLUSTER_STATE
addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_CC_DETAIL); // must
not precede add of CLUSTER_STATE
addServlet(jsonAPIServer, Servlets.DIAGNOSTICS);
@@ -283,6 +285,8 @@
componentProvider);
case Servlets.CONNECTOR:
return new ConnectorApiServlet(ctx, paths, appCtx);
+ case Servlets.REBALANCE:
+ return new RebalanceApiServlet(ctx, paths, appCtx);
case Servlets.SHUTDOWN:
return new ShutdownApiServlet(ctx, paths);
case Servlets.VERSION:
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
index 039933a..b937137 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
@@ -43,4 +43,5 @@
jobSpec.addRoot(frod);
return jobSpec;
}
+
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
new file mode 100644
index 0000000..afc2709
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -0,0 +1,204 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.asterix.utils;
+
+import static org.apache.asterix.app.translator.QueryTranslator.abort;
+
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.utils.JobUtils;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.NodeGroup;
+import org.apache.asterix.metadata.lock.LockList;
+import org.apache.asterix.metadata.lock.MetadataLockManager;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+
+public class RebalanceUtil {
+
+ private RebalanceUtil() {
+
+ }
+
+ public static void rebalance(String dataverseName, String datasetName,
List<String> targetNcNames,
+ MetadataProvider metadataProvider, IHyracksClientConnection hcc)
throws Exception {
+ // The source dataset.
+ Dataset sourceDataset = getSourceDataset(dataverseName, datasetName,
metadataProvider);
+
+ // The new node group name.
+ String nodeGroupName = datasetName + "_" +
(sourceDataset.getRebalanceId() + 1);
+
+ // Creates a node group for rebalance.
+ createNodeGroup(nodeGroupName, targetNcNames, metadataProvider);
+
+ // The target dataset for rebalance.
+ Dataset targetDataset = new Dataset(sourceDataset, true,
nodeGroupName);
+
+ // Rebalances the source dataset into the target dataset.
+ rebalance(sourceDataset, targetDataset, metadataProvider, hcc);
+ }
+
+ private static Dataset getSourceDataset(String dataverseName, String
datasetName, MetadataProvider metadataProvider)
+ throws ACIDException, RemoteException, AlgebricksException {
+ MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+ Dataset sourceDataset = null;
+ try {
+ sourceDataset = metadataProvider.findDataset(dataverseName,
datasetName);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ abort(e, e, mdTxnCtx);
+ throw e;
+ } finally {
+ metadataProvider.getLocks().unlock();
+ metadataProvider.getLocks().reset();
+ }
+ return sourceDataset;
+ }
+
+ // Creates a node group for the rebalance target dataset.
+ private static void createNodeGroup(String ngName, List<String> ncNames,
MetadataProvider metadataProvider)
+ throws Exception {
+ MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
+ metadataProvider.setMetadataTxnContext(mdTxnCtx);
+
MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(),
ngName);
+ try {
+ NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx,
ngName);
+ if (ng == null) {
+ MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new
NodeGroup(ngName, ncNames));
+ }
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ } catch (Exception e) {
+ abort(e, e, mdTxnCtx);
+ throw e;
+ } finally {
+ metadataProvider.getLocks().unlock();
+ metadataProvider.getLocks().reset();
+ }
+ }
+
+ private static void rebalance(Dataset source, Dataset target,
MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc) throws Exception {
+ // Creates the rebalance target.
+ createRebalanceTarget(target, metadataProvider, hcc);
+
+ // Populates the data from the rebalance source to the rebalance
target.
+ populateDataToRebalanceTarget(source, target, metadataProvider, hcc);
+
+ // Atomically switches the rebalance target to become the source
dataset.
+ rebalanceSwitch(source, target, metadataProvider, hcc);
+ }
+
+ private static void rebalanceSwitch(Dataset source, Dataset target,
MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc) throws Exception {
+ MutableObject<JobUtils.ProgressState> progress = new
MutableObject<>(JobUtils.ProgressState.NO_PROGRESS);
+ MutableObject<MetadataTransactionContext> mdTxnCtx = new
MutableObject<>(
+ MetadataManager.INSTANCE.beginTransaction());
+ writeLockDataset(metadataProvider.getLocks(), source);
+ try {
+ MutableBoolean bActiveTxn = new MutableBoolean(true);
+ metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
+ List<JobSpecification> jobsToExecute = new ArrayList<>();
+ // Drops the original dataset's metadata entry
+ source.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn,
progress, hcc);
+ // Adds a new rebalanced dataset entry in the metadata storage
+ MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(), target);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
+ } catch (Exception e) {
+ abort(e, e, mdTxnCtx.getValue());
+ throw e;
+ } finally {
+ metadataProvider.getLocks().unlock();
+ metadataProvider.getLocks().reset();
+ }
+ }
+
+ private static void createRebalanceTarget(Dataset target, MetadataProvider
metadataProvider,
+ IHyracksClientConnection hcc) throws Exception {
+ // Creates the files for the rebalance target.
+ JobSpecification spec = DatasetUtil.createDatasetJobSpec(target,
metadataProvider);
+ JobUtils.runJob(hcc, spec, true);
+ }
+
+ private static void populateDataToRebalanceTarget(Dataset source, Dataset
target, MetadataProvider metadataProvider,
+ IHyracksClientConnection hcc) throws Exception {
+ JobSpecification spec = new JobSpecification();
+ JobId jobId = JobIdFactory.generateJobId();
+ JobEventListenerFactory jobEventListenerFactory = new
JobEventListenerFactory(jobId, true);
+ spec.setJobletEventListenerFactory(jobEventListenerFactory);
+
+ // The pipeline starter.
+ IOperatorDescriptor ets = DatasetUtil.createDummyKeyProviderOp(spec,
source, metadataProvider);
+
+ // Creates primary index scan op.
+ IOperatorDescriptor primaryScanOp =
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source, jobId);
+
+ // Creates secondary BTree upsert op.
+ IOperatorDescriptor upsertOp =
DatasetUtil.createPrimaryIndexUpsertOp(spec, metadataProvider, source, target);
+
+ // The final commit operator.
+ IOperatorDescriptor commitOp = DatasetUtil.createUpsertCommitOp(spec,
metadataProvider, jobId, target);
+
+ // Connects empty-tuple-source and scan.
+ spec.connect(new OneToOneConnectorDescriptor(spec), ets, 0,
primaryScanOp, 0);
+
+ // Connects scan and upsert.
+ int numKeys = target.getPrimaryKeys().size();
+ int[] keys = new int[numKeys];
+ for (int i = 0; i < numKeys; ++i) {
+ keys[i] = i;
+ }
+ IConnectorDescriptor connectorDescriptor = new
MToNPartitioningConnectorDescriptor(spec,
+ new FieldHashPartitionComputerFactory(keys,
target.getPrimaryHashFunctionFactories(metadataProvider)));
+ spec.connect(connectorDescriptor, primaryScanOp, 0, upsertOp, 0);
+
+ // Connects upsert and sink.
+ spec.connect(new OneToOneConnectorDescriptor(spec), upsertOp, 0,
commitOp, 0);
+
+ // Executes the job.
+ JobUtils.runJob(hcc, spec, true);
+ }
+
+ private static void writeLockDataset(LockList locks, Dataset dataset) {
+ MetadataLockManager.INSTANCE.acquireDataverseReadLock(locks,
dataset.getDataverseName());
+ MetadataLockManager.INSTANCE.acquireDatasetWriteLock(locks,
+ dataset.getDataverseName() + "." + dataset.getDatasetName());
+ }
+}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 856638f..cbe551b 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -406,7 +406,7 @@
IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators,
keyFieldTypes, false, true,
MetadataUtil.PENDING_NO_OP);
List<String> nodes =
Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId());
- FileSplit[] splits =
SplitsAndConstraintsUtil.getDatasetSplits(dataset, nodes, index.getIndexName(),
false);
+ FileSplit[] splits =
SplitsAndConstraintsUtil.getIndexSplits(dataset, index.getIndexName(), nodes);
fileSplitProvider = new
ConstantFileSplitProvider(Arrays.copyOfRange(splits, 0, 1));
}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceTest.java
new file mode 100644
index 0000000..778e4ee
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceTest.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.asterix.test.runtime;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the SQL++ runtime tests with the storage parallelism.
+ */
+@RunWith(Parameterized.class)
+public class RebalanceTest {
+ protected static final String TEST_CONFIG_FILE_NAME =
"asterix-build-configuration.xml";
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "RebalanceTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.tests("only_sqlpp.xml", "rebalance.xml");
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public RebalanceTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ LangExecutionUtil.test(tcCtx);
+ }
+}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.1.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.1.ddl.sqlpp
new file mode 100644
index 0000000..92698ab
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.1.ddl.sqlpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse tpch if exists;
+create dataverse tpch;
+
+use tpch;
+
+
+create type tpch.LineItemType as
+ closed {
+ l_orderkey : bigint,
+ l_partkey : bigint,
+ l_suppkey : bigint,
+ l_linenumber : bigint,
+ l_quantity : double,
+ l_extendedprice : double,
+ l_discount : double,
+ l_tax : double,
+ l_returnflag : string,
+ l_linestatus : string,
+ l_shipdate : string,
+ l_commitdate : string,
+ l_receiptdate : string,
+ l_shipinstruct : string,
+ l_shipmode : string,
+ l_comment : string
+}
+
+create dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.2.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.2.update.sqlpp
new file mode 100644
index 0000000..8a59946
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.2.update.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+load dataset LineItem using localfs
((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+ (`format`=`delimited-text`),(`delimiter`=`|`));
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.3.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.3.query.sqlpp
new file mode 100644
index 0000000..ca42f3c
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.3.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+select value count(*) from LineItem;
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.4.post.http
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.4.post.http
new file mode 100644
index 0000000..f5a2a75
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.4.post.http
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/rebalance?dataverseName=tpch&datasetName=LineItem&nodes="asterix_nc1"
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.5.get.http
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.5.get.http
new file mode 100644
index 0000000..360a01f
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.5.get.http
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/connector?dataverseName=tpch&datasetName=LineItem
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.6.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.6.query.sqlpp
new file mode 100644
index 0000000..ca42f3c
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.6.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+select value count(*) from LineItem;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml
new file mode 100644
index 0000000..d380494
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ~ /*
+ ~ * Licensed to the Apache Software Foundation (ASF) under one
+ ~ * or more contributor license agreements. See the NOTICE file
+ ~ * distributed with this work for additional information
+ ~ * regarding copyright ownership. The ASF licenses this file
+ ~ * to you under the Apache License, Version 2.0 (the
+ ~ * "License"); you may not use this file except in compliance
+ ~ * with the License. You may obtain a copy of the License at
+ ~ *
+ ~ * http://www.apache.org/licenses/LICENSE-2.0
+ ~ *
+ ~ * Unless required by applicable law or agreed to in writing,
+ ~ * software distributed under the License is distributed on an
+ ~ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ * KIND, either express or implied. See the License for the
+ ~ * specific language governing permissions and limitations
+ ~ * under the License.
+ ~ */
+ -->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org"
ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp">
+ <test-group name="rebalance">
+ <test-case FilePath="rebalance">
+ <compilation-unit name="single_dataset">
+ <output-dir compare="Text">single_dataset</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm
new file mode 100644
index 0000000..3a8dfe0
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm
@@ -0,0 +1 @@
+6005
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm
new file mode 100644
index 0000000..4b9eb7d
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm
@@ -0,0 +1 @@
+{"results":"successful"}
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm
new file mode 100644
index 0000000..4f0990e
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm
@@ -0,0 +1 @@
+{"temp":false,"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/1/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/1/LineItem_idx_LineItem"}]}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm
new file mode 100644
index 0000000..3a8dfe0
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm
@@ -0,0 +1 @@
+6005
\ No newline at end of file
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index 3047ef5..f11461b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -32,6 +32,7 @@
public static final String QUERY_RESULT = "/query/service/result/*";
public static final String QUERY_SERVICE = "/query/service";
public static final String CONNECTOR = "/connector";
+ public static final String REBALANCE = "/rebalance";
public static final String SHUTDOWN = "/admin/shutdown";
public static final String VERSION = "/admin/version";
public static final String RUNNING_REQUESTS = "/admin/requests/running/*";
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 1aa1474..d49487d 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -37,7 +37,6 @@
public static final String PARTITION_DIR_PREFIX = "partition_";
public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
- public static final String ADAPTER_INSTANCE_PREFIX = "adapter_";
private StoragePathUtil() {
}
@@ -61,16 +60,18 @@
return storageDirName + File.separator +
StoragePathUtil.PARTITION_DIR_PREFIX + partitonId;
}
- public static String prepareDataverseIndexName(String dataverseName,
String datasetName, String idxName) {
- return prepareDataverseIndexName(dataverseName,
prepareFullIndexName(datasetName, idxName));
+ public static String prepareDataverseIndexName(String dataverseName,
String datasetName, String idxName,
+ long rebalanceId) {
+ return prepareDataverseIndexName(dataverseName,
prepareFullIndexName(datasetName, idxName, rebalanceId));
}
public static String prepareDataverseIndexName(String dataverseName,
String fullIndexName) {
return dataverseName + File.separator + fullIndexName;
}
- private static String prepareFullIndexName(String datasetName, String
idxName) {
- return datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName;
+ private static String prepareFullIndexName(String datasetName, String
idxName, long rebalanceId) {
+ return (rebalanceId == 0 ? "" : rebalanceId + File.separator) +
datasetName + DATASET_INDEX_NAME_SEPARATOR
+ + idxName;
}
public static int getPartitionNumFromName(String name) {
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
index 01ade10..c84a5bd 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
@@ -26,7 +26,6 @@
import java.util.Map;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.metadata.api.IMetadataEntity;
import org.apache.asterix.metadata.entities.CompactionPolicy;
@@ -39,9 +38,9 @@
import org.apache.asterix.metadata.entities.FeedPolicyEntity;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.metadata.entities.Library;
import org.apache.asterix.metadata.entities.NodeGroup;
+import org.apache.asterix.metadata.utils.IndexUtil;
/**
* Caches metadata entities such that the MetadataManager does not have to
@@ -161,10 +160,7 @@
// Add the primary index associated with the dataset, if the
dataset is an
// internal dataset.
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
- InternalDatasetDetails id = (InternalDatasetDetails)
dataset.getDatasetDetails();
- Index index = new Index(dataset.getDataverseName(),
dataset.getDatasetName(),
- dataset.getDatasetName(), IndexType.BTREE,
id.getPartitioningKey(),
- id.getKeySourceIndicator(),
id.getPrimaryKeyType(), false, true, dataset.getPendingOp());
+ Index index = IndexUtil.getPrimaryIndex(dataset);
addIndexIfNotExistsInternal(index);
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 272cced..673a5ae 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -335,10 +335,6 @@
String resourceName = metadataPartitionPath + File.separator +
index.getFileNameRelativePath();
FileReference file = ioManager.getFileReference(metadataDeviceId,
resourceName);
index.setFile(file);
- // this should not be done this way. dataset lifecycle manager
shouldn't return virtual buffer caches for
- // a dataset that was not yet created
- List<IVirtualBufferCache> virtualBufferCaches =
appContext.getDatasetLifecycleManager()
- .getVirtualBufferCaches(index.getDatasetId().getId(),
metadataPartition.getIODeviceNum());
ITypeTraits[] typeTraits = index.getTypeTraits();
IBinaryComparatorFactory[] cmpFactories =
index.getKeyBinaryComparatorFactory();
int[] bloomFilterKeyFields = index.getBloomFilterKeyFields();
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
index ee5c9f3..b259744 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
@@ -19,7 +19,6 @@
package org.apache.asterix.metadata.bootstrap;
-import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -27,6 +26,7 @@
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ImmutableDatasetId;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -232,7 +232,9 @@
@Override
public String getFileNameRelativePath() {
- return getDataverseName() + File.separator + getIndexedDatasetName() +
"_idx_" + getIndexName();
+ // The rebalance Id for metadata dataset is always 0.
+ return StoragePathUtil.prepareDataverseIndexName(getDataverseName(),
getIndexedDatasetName(), getIndexName(),
+ 0);
}
@Override
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 69f4e03..b094801 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -82,7 +82,6 @@
import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor;
import
org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
import org.apache.asterix.runtime.utils.ClusterStateManager;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -306,6 +305,14 @@
return MetadataManagerUtil.findType(mdTxnCtx, dataverse, typeName);
}
+ public IAType findType(Dataset dataset) throws AlgebricksException {
+ return findType(dataset.getItemTypeDataverseName(),
dataset.getItemTypeName());
+ }
+
+ public IAType findMetaType(Dataset dataset) throws AlgebricksException {
+ return findType(dataset.getMetaItemTypeDataverseName(),
dataset.getMetaItemTypeName());
+ }
+
public Feed findFeed(String dataverse, String feedName) throws
AlgebricksException {
return MetadataManagerUtil.findFeed(mdTxnCtx, dataverse, feedName);
}
@@ -379,17 +386,6 @@
throw new AlgebricksException(e);
}
return new Pair<>(dataScanner, constraint);
- }
-
- public IDataFormat getDataFormat(String dataverseName) throws
CompilationException {
- Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx,
dataverseName);
- IDataFormat format;
- try {
- format = (IDataFormat)
Class.forName(dataverse.getDataFormat()).newInstance();
- } catch (Exception e) {
- throw new CompilationException(e);
- }
- return format;
}
public Dataverse findDataverse(String dataverseName) throws
CompilationException {
@@ -760,10 +756,9 @@
return
SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse);
}
- public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx,
String dataverseName, String datasetName,
- String targetIdxName, boolean temp) throws AlgebricksException {
- return
SplitsAndConstraintsUtil.getDatasetSplits(findDataset(dataverseName,
datasetName), mdTxnCtx,
- targetIdxName, temp);
+ public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx,
Dataset dataset, String indexName)
+ throws AlgebricksException {
+ return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName,
mdTxnCtx);
}
public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx,
String dataverseName, String adapterName)
@@ -860,79 +855,10 @@
fieldPermutation[i++] = idx;
}
}
- try {
- Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx,
dataset.getDataverseName(),
- dataset.getDatasetName(), dataset.getDatasetName());
- String itemTypeName = dataset.getItemTypeName();
- String itemTypeDataverseName = dataset.getItemTypeDataverseName();
- ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
- .getDatatype(mdTxnCtx, itemTypeDataverseName,
itemTypeName).getDatatype();
- ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset);
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraint =
- getSplitProviderAndConstraints(dataset);
- // prepare callback
- JobId jobId = ((JobEventListenerFactory)
spec.getJobletEventListenerFactory()).getJobId();
- int[] primaryKeyFields = new int[numKeys];
- for (i = 0; i < numKeys; i++) {
- primaryKeyFields[i] = i;
- }
-
- boolean hasSecondaries = MetadataManager.INSTANCE
- .getDatasetIndexes(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName()).size() > 1;
-
- IModificationOperationCallbackFactory modificationCallbackFactory
= dataset.getModificationCallbackFactory(
- storaegComponentProvider, primaryIndex, jobId,
IndexOperation.UPSERT, primaryKeyFields);
- ISearchOperationCallbackFactory searchCallbackFactory =
dataset.getSearchCallbackFactory(
- storaegComponentProvider, primaryIndex, jobId,
IndexOperation.UPSERT, primaryKeyFields);
- IIndexDataflowHelperFactory idfh =
- new
IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(),
splitsAndConstraint.first);
- LSMPrimaryUpsertOperatorDescriptor op;
- ITypeTraits[] outputTypeTraits =
- new ITypeTraits[recordDesc.getFieldCount() +
(dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
- ISerializerDeserializer<?>[] outputSerDes = new
ISerializerDeserializer[recordDesc.getFieldCount()
- + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-
- // add the previous record first
- int f = 0;
- outputSerDes[f] =
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
- f++;
- // add the previous meta second
- if (dataset.hasMetaPart()) {
- outputSerDes[f] =
-
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
- outputTypeTraits[f] =
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
- f++;
- }
- // add the previous filter third
- int fieldIdx = -1;
- if (numFilterFields > 0) {
- String filterField =
DatasetUtil.getFilterField(dataset).get(0);
- for (i = 0; i < itemType.getFieldNames().length; i++) {
- if (itemType.getFieldNames()[i].equals(filterField)) {
- break;
- }
- }
- fieldIdx = i;
- outputTypeTraits[f] =
FormatUtils.getDefaultFormat().getTypeTraitProvider()
- .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
- outputSerDes[f] =
FormatUtils.getDefaultFormat().getSerdeProvider()
-
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
- f++;
- }
- for (int j = 0; j < recordDesc.getFieldCount(); j++) {
- outputTypeTraits[j + f] = recordDesc.getTypeTraits()[j];
- outputSerDes[j + f] = recordDesc.getFields()[j];
- }
- RecordDescriptor outputRecordDesc = new
RecordDescriptor(outputSerDes, outputTypeTraits);
- op = new LSMPrimaryUpsertOperatorDescriptor(spec,
outputRecordDesc, fieldPermutation, idfh,
- context.getMissingWriterFactory(),
modificationCallbackFactory, searchCallbackFactory,
- dataset.getFrameOpCallbackFactory(), numKeys, itemType,
fieldIdx, hasSecondaries);
- return new Pair<>(op, splitsAndConstraint.second);
-
- } catch (MetadataException me) {
- throw new AlgebricksException(me);
- }
+ return DatasetUtil.createPrimaryIndexUpsertOp(spec, this, dataset,
recordDesc, fieldPermutation,
+ context.getMissingWriterFactory());
}
+
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
buildExternalDatasetDataScannerRuntime(
JobSpecification jobSpec, IAType itemType, IAdapterFactory
adapterFactory, IDataFormat format)
@@ -1635,15 +1561,12 @@
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
getSplitProviderAndConstraints(Dataset ds)
throws AlgebricksException {
- FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(),
ds.getDatasetName(), ds.getDatasetName(),
- ds.getDatasetDetails().isTemp());
- return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+ return getSplitProviderAndConstraints(ds, ds.getDatasetName());
}
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
getSplitProviderAndConstraints(Dataset ds,
String indexName) throws AlgebricksException {
- FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(),
ds.getDatasetName(), indexName,
- ds.getDatasetDetails().isTemp());
+ FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName);
return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 34fa7bb..2dffc02 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -44,6 +44,9 @@
import org.apache.asterix.common.utils.JobUtils.ProgressState;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.MetadataCache;
import org.apache.asterix.metadata.MetadataManager;
@@ -75,17 +78,23 @@
import
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
import
org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
+import
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.JobSpecification;
import
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
@@ -133,7 +142,9 @@
private final IDatasetDetails datasetDetails;
private final String metaTypeDataverseName;
private final String metaTypeName;
+ private final long rebalanceId;
private int pendingOp;
+
/*
* Transient (For caching)
*/
@@ -151,6 +162,30 @@
String metaItemTypeDataverseName, String metaItemTypeName, String
nodeGroupName, String compactionPolicy,
Map<String, String> compactionPolicyProperties, IDatasetDetails
datasetDetails, Map<String, String> hints,
DatasetType datasetType, int datasetId, int pendingOp) {
+ this(dataverseName, datasetName, itemTypeDataverseName, itemTypeName,
metaItemTypeDataverseName,
+ metaItemTypeName, nodeGroupName, compactionPolicy,
compactionPolicyProperties, datasetDetails, hints,
+ datasetType, datasetId, pendingOp, 0L);
+ }
+
+ public Dataset(Dataset dataset) {
+ this(dataset.dataverseName, dataset.datasetName,
dataset.recordTypeDataverseName, dataset.recordTypeName,
+ dataset.metaTypeDataverseName, dataset.metaTypeName,
dataset.nodeGroupName,
+ dataset.compactionPolicyFactory,
dataset.compactionPolicyProperties, dataset.datasetDetails,
+ dataset.hints, dataset.datasetType, dataset.datasetId,
dataset.pendingOp, dataset.rebalanceId);
+ }
+
+ public Dataset(Dataset dataset, boolean forRebalance, String
nodeGroupName) {
+ this(dataset.dataverseName, dataset.datasetName,
dataset.recordTypeDataverseName, dataset.recordTypeName,
+ dataset.metaTypeDataverseName, dataset.metaTypeName,
nodeGroupName, dataset.compactionPolicyFactory,
+ dataset.compactionPolicyProperties, dataset.datasetDetails,
dataset.hints, dataset.datasetType,
+ forRebalance ?
DatasetIdFactory.generateAlternatingDatasetId(dataset.datasetId) :
dataset.datasetId,
+ dataset.pendingOp, forRebalance ? dataset.rebalanceId + 1 :
dataset.rebalanceId);
+ }
+
+ public Dataset(String dataverseName, String datasetName, String
itemTypeDataverseName, String itemTypeName,
+ String metaItemTypeDataverseName, String metaItemTypeName, String
nodeGroupName, String compactionPolicy,
+ Map<String, String> compactionPolicyProperties, IDatasetDetails
datasetDetails, Map<String, String> hints,
+ DatasetType datasetType, int datasetId, int pendingOp, long
rebalanceId) {
this.dataverseName = dataverseName;
this.datasetName = datasetName;
this.recordTypeName = itemTypeName;
@@ -165,13 +200,7 @@
this.datasetId = datasetId;
this.pendingOp = pendingOp;
this.hints = hints;
- }
-
- public Dataset(Dataset dataset) {
- this(dataset.dataverseName, dataset.datasetName,
dataset.recordTypeDataverseName, dataset.recordTypeName,
- dataset.metaTypeDataverseName, dataset.metaTypeName,
dataset.nodeGroupName,
- dataset.compactionPolicyFactory,
dataset.compactionPolicyProperties, dataset.datasetDetails,
- dataset.hints, dataset.datasetType, dataset.datasetId,
dataset.pendingOp);
+ this.rebalanceId = rebalanceId;
}
public String getDataverseName() {
@@ -228,6 +257,10 @@
public String getMetaItemTypeName() {
return metaTypeName;
+ }
+
+ public long getRebalanceId() {
+ return rebalanceId;
}
public boolean hasMetaPart() {
@@ -591,31 +624,12 @@
return Objects.hash(dataverseName, datasetName);
}
- public IPushRuntimeFactory getCommitRuntimeFactory(JobId jobId, int[]
primaryKeyFields,
- MetadataProvider metadataProvider, int[] datasetPartitions,
boolean isSink) {
+ public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider
metadataProvider, JobId jobId,
+ int[] primaryKeyFields, boolean isSink) throws AlgebricksException
{
+ int[] datasetPartitions = getDatasetPartitions(metadataProvider);
return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
metadataProvider.isTemporaryDatasetWriteJob(),
metadataProvider.isWriteTransaction(), datasetPartitions,
isSink);
- }
-
- /**
- * Get the index dataflow helper factory for the dataset's primary index
- *
- * @param mdProvider
- * an instance of metadata provider that is used to fetch
metadata information
- * @throws AlgebricksException
- */
- public IResourceFactory getResourceFactory(MetadataProvider mdProvider)
throws AlgebricksException {
- if (getDatasetType() != DatasetType.INTERNAL) {
- throw new AlgebricksException(ErrorCode.ASTERIX,
-
ErrorCode.COMPILATION_DATASET_TYPE_DOES_NOT_HAVE_PRIMARY_INDEX,
getDatasetType());
- }
- Index index = mdProvider.getIndex(getDataverseName(),
getDatasetName(), getDatasetName());
- ARecordType recordType = (ARecordType)
mdProvider.findType(getItemTypeDataverseName(), getItemTypeName());
- ARecordType metaType = (ARecordType)
mdProvider.findType(getMetaItemTypeDataverseName(), getMetaItemTypeName());
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
- DatasetUtil.getMergePolicyFactory(this,
mdProvider.getMetadataTxnContext());
- return getResourceFactory(mdProvider, index, recordType, metaType,
compactionInfo.first, compactionInfo.second);
}
public IFrameOperationCallbackFactory getFrameOpCallbackFactory() {
@@ -659,6 +673,41 @@
return typeTraits;
}
+ public RecordDescriptor getPrimaryRecordDescriptor(MetadataProvider
metadataProvider) throws AlgebricksException {
+ List<List<String>> partitioningKeys = getPrimaryKeys();
+ int numPrimaryKeys = partitioningKeys.size();
+ ISerializerDeserializer[] primaryRecFields = new
ISerializerDeserializer[numPrimaryKeys + 1
+ + (hasMetaPart() ? 1 : 0)];
+ ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 +
(hasMetaPart() ? 1 : 0)];
+ ISerializerDeserializerProvider serdeProvider =
metadataProvider.getFormat().getSerdeProvider();
+ List<Integer> indicators = null;
+ if (hasMetaPart()) {
+ indicators = ((InternalDatasetDetails)
getDatasetDetails()).getKeySourceIndicator();
+ }
+ ARecordType itemType = (ARecordType) metadataProvider.findType(this);
+ ARecordType metaType = (ARecordType)
metadataProvider.findMetaType(this);
+
+ // Set the serde/traits for primary keys
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ IAType keyType = (indicators == null || indicators.get(i) == 0)
+ ? itemType.getSubFieldType(partitioningKeys.get(i))
+ : metaType.getSubFieldType(partitioningKeys.get(i));
+ primaryRecFields[i] =
serdeProvider.getSerializerDeserializer(keyType);
+ primaryTypeTraits[i] =
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+ }
+
+ // Set the serde for the record field
+ primaryRecFields[numPrimaryKeys] =
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+ primaryTypeTraits[numPrimaryKeys] =
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+ if (hasMetaPart()) {
+ // Set the serde and traits for the meta record field
+ primaryRecFields[numPrimaryKeys + 1] =
SerializerDeserializerProvider.INSTANCE
+ .getSerializerDeserializer(metaType);
+ primaryTypeTraits[numPrimaryKeys + 1] =
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+ }
+ return new RecordDescriptor(primaryRecFields, primaryTypeTraits);
+ }
+
public IBinaryComparatorFactory[]
getPrimaryComparatorFactories(MetadataProvider metadataProvider,
ARecordType recordType, ARecordType metaType) throws
AlgebricksException {
IStorageComponentProvider storageComponentProvider =
metadataProvider.getStorageComponentProvider();
@@ -671,12 +720,32 @@
indicators = ((InternalDatasetDetails)
getDatasetDetails()).getKeySourceIndicator();
}
for (int i = 0; i < numPrimaryKeys; i++) {
- IAType keyType =
- (indicators == null || indicators.get(i) == 0) ?
recordType.getSubFieldType(partitioningKeys.get(i))
- :
metaType.getSubFieldType(partitioningKeys.get(i));
+ IAType keyType = (indicators == null || indicators.get(i) == 0)
+ ? recordType.getSubFieldType(partitioningKeys.get(i))
+ : metaType.getSubFieldType(partitioningKeys.get(i));
cmpFactories[i] =
cmpFactoryProvider.getBinaryComparatorFactory(keyType, true);
}
return cmpFactories;
+ }
+
+ public IBinaryHashFunctionFactory[]
getPrimaryHashFunctionFactories(MetadataProvider metadataProvider)
+ throws AlgebricksException {
+ ARecordType recordType = (ARecordType) metadataProvider.findType(this);
+ ARecordType metaType = (ARecordType)
metadataProvider.findMetaType(this);
+ List<List<String>> partitioningKeys = getPrimaryKeys();
+ int numPrimaryKeys = partitioningKeys.size();
+ IBinaryHashFunctionFactory[] hashFuncFactories = new
IBinaryHashFunctionFactory[numPrimaryKeys];
+ List<Integer> indicators = null;
+ if (hasMetaPart()) {
+ indicators = ((InternalDatasetDetails)
getDatasetDetails()).getKeySourceIndicator();
+ }
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ IAType keyType =
+ (indicators == null || indicators.get(i) == 0) ?
recordType.getSubFieldType(partitioningKeys.get(i))
+ :
metaType.getSubFieldType(partitioningKeys.get(i));
+ hashFuncFactories[i] =
BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(keyType);
+ }
+ return hashFuncFactories;
}
@Override
@@ -689,4 +758,14 @@
}
return bloomFilterKeyFields;
}
+
+ protected int[] getDatasetPartitions(MetadataProvider metadataProvider)
throws AlgebricksException {
+ FileSplit[] splitsForDataset =
metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this,
+ getDatasetName());
+ int[] datasetPartitions = new int[splitsForDataset.length];
+ for (int i = 0; i < splitsForDataset.length; i++) {
+ datasetPartitions[i] = i;
+ }
+ return datasetPartitions;
+ }
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
index df47c70..5243855 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
@@ -41,7 +41,6 @@
private static final long serialVersionUID = 1L;
public static final int RECORD_INDICATOR = 0;
- public static final int META_INDICATOR = 1;
private final String dataverseName;
// Enforced to be unique within a dataverse.
@@ -59,6 +58,13 @@
// Type of pending operations with respect to atomic DDL operation
private int pendingOp;
+
+// public Index(Dataset dataset){
+// // Constructs the primary indexes from dataset.
+// this(dataset.getDataverseName(), dataset.getDatasetName(),
dataset.getDatasetName(),
+// IndexType.BTREE, dataset.getPrimaryKeys(), null,
dataset.getpri);
+// }
+
public Index(String dataverseName, String datasetName, String indexName,
IndexType indexType,
List<List<String>> keyFieldNames, List<Integer>
keyFieldSourceIndicators, List<IAType> keyFieldTypes,
int gramLength, boolean enforceKeyFields, boolean isPrimaryIndex,
int pendingOp) {
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index c3c5023..3273a68 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -51,8 +51,10 @@
import org.apache.asterix.om.base.ABoolean;
import org.apache.asterix.om.base.ADateTime;
import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
import org.apache.asterix.om.base.AInt8;
import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
import org.apache.asterix.om.base.AMutableString;
import org.apache.asterix.om.base.AOrderedList;
import org.apache.asterix.om.base.ARecord;
@@ -73,26 +75,26 @@
*/
public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
private static final long serialVersionUID = 1L;
- // Field indexes of serialized Dataset in a tuple.
- // First key field.
- public static final int DATASET_DATAVERSENAME_TUPLE_FIELD_INDEX = 0;
- // Second key field.
- public static final int DATASET_DATASETNAME_TUPLE_FIELD_INDEX = 1;
// Payload field containing serialized Dataset.
public static final int DATASET_PAYLOAD_TUPLE_FIELD_INDEX = 2;
+ private static final String REBALANCE_ID_FIELD_NAME = "rebalanceId";
@SuppressWarnings("unchecked")
protected final ISerializerDeserializer<ARecord> recordSerDes =
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(MetadataRecordTypes.DATASET_RECORDTYPE);
protected final transient AMutableInt32 aInt32;
protected final transient ISerializerDeserializer<AInt32> aInt32Serde;
+ protected final transient AMutableInt64 aBigInt;
+ protected final transient ISerializerDeserializer<AInt64> aBigIntSerde;
protected final transient ArrayBackedValueStorage fieldName = new
ArrayBackedValueStorage();
@SuppressWarnings("unchecked")
protected DatasetTupleTranslator(boolean getTuple) {
super(getTuple,
MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
aInt32 = new AMutableInt32(-1);
+ aBigInt = new AMutableInt64(-1);
aInt32Serde =
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+ aBigIntSerde =
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
}
@Override
@@ -107,7 +109,6 @@
}
protected Dataset createDatasetFromARecord(ARecord datasetRecord) throws
HyracksDataException {
-
String dataverseName =
((AString)
datasetRecord.getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATAVERSENAME_FIELD_INDEX))
.getStringValue();
@@ -258,9 +259,14 @@
metaTypeName = ((AString)
datasetRecord.getValueByPos(metaTypeNameIndex)).getStringValue();
}
+ // Read the rebalance id if there is one.
+ int rebalanceIdIndex =
datasetRecord.getType().getFieldIndex(REBALANCE_ID_FIELD_NAME);
+ long rebalanceId = rebalanceIdIndex >= 0
+ ? ((AInt64)
datasetRecord.getValueByPos(rebalanceIdIndex)).getLongValue() : 0;
+
return new Dataset(dataverseName, datasetName, typeDataverseName,
typeName, metaTypeDataverseName, metaTypeName,
nodeGroupName, compactionPolicy, compactionPolicyProperties,
datasetDetails, hints, datasetType,
- datasetId, pendingOp);
+ datasetId, pendingOp, rebalanceId);
}
@Override
@@ -409,6 +415,16 @@
stringSerde.serialize(aString, fieldValue.getDataOutput());
recordBuilder.addField(fieldName, fieldValue);
}
+ if (dataset.getRebalanceId() > 0) {
+ // Adds the field rebalanceId.
+ fieldName.reset();
+ aString.setValue("rebalanceId");
+ stringSerde.serialize(aString, fieldName.getDataOutput());
+ fieldValue.reset();
+ aBigInt.setValue(dataset.getRebalanceId());
+ aBigIntSerde.serialize(aBigInt, fieldValue.getDataOutput());
+ recordBuilder.addField(fieldName, fieldValue);
+ }
}
protected void writeDatasetDetailsRecordType(IARecordBuilder
recordBuilder, Dataset dataset, DataOutput dataOutput)
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java
index 6e6f086..418d48f 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java
@@ -21,13 +21,16 @@
import java.util.ArrayList;
import java.util.List;
-import org.apache.asterix.metadata.lock.IMetadataLock.Mode;
import org.apache.commons.lang3.tuple.Pair;
public class LockList {
- List<Pair<IMetadataLock.Mode, IMetadataLock>> locks = new ArrayList<>();
+ private List<Pair<IMetadataLock.Mode, IMetadataLock>> locks = new
ArrayList<>();
+ private boolean unlocked = false;
public void add(IMetadataLock.Mode mode, IMetadataLock lock) {
+ if (unlocked) {
+ throw new IllegalStateException();
+ }
lock.acquire(mode);
locks.add(Pair.of(mode, lock));
}
@@ -37,9 +40,12 @@
Pair<IMetadataLock.Mode, IMetadataLock> pair = locks.get(i);
pair.getRight().release(pair.getLeft());
}
+ locks.clear();
+ unlocked = true;
}
public void reset() {
- locks.clear();
+ unlock();
+ unlocked = false;
}
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 098645e..6223bba 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -19,7 +19,6 @@
package org.apache.asterix.metadata.utils;
import java.io.DataOutput;
-import java.io.File;
import java.rmi.RemoteException;
import java.util.List;
import java.util.Map;
@@ -29,11 +28,18 @@
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.context.ITransactionSubsystemProvider;
+import org.apache.asterix.common.context.TransactionSubsystemProvider;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.MetadataException;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -47,25 +53,43 @@
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.formats.FormatUtils;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor;
import org.apache.asterix.runtime.utils.RuntimeUtils;
+import
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import
org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor;
import
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
import org.apache.hyracks.storage.common.IResourceFactory;
@@ -217,8 +241,7 @@
public static JobSpecification dropDatasetJobSpec(Dataset dataset,
MetadataProvider metadataProvider)
throws AlgebricksException, HyracksDataException, RemoteException,
ACIDException {
- String datasetPath = dataset.getDataverseName() + File.separator +
dataset.getDatasetName();
- LOGGER.info("DROP DATASETPATH: " + datasetPath);
+ LOGGER.info("DROP DATASET: " + dataset);
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
return
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
}
@@ -249,41 +272,32 @@
return spec;
}
- public static JobSpecification createDatasetJobSpec(Dataverse dataverse,
String datasetName,
- MetadataProvider metadataProvider) throws AlgebricksException {
- String dataverseName = dataverse.getDataverseName();
- Dataset dataset = metadataProvider.findDataset(dataverseName,
datasetName);
- if (dataset == null) {
- throw new AsterixException("Could not find dataset " + datasetName
+ " in dataverse " + dataverseName);
- }
- Index index =
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
dataverseName,
- datasetName, datasetName);
- ARecordType itemType =
- (ARecordType)
metadataProvider.findType(dataset.getItemTypeDataverseName(),
dataset.getItemTypeName());
+ public static JobSpecification createDatasetJobSpec(Dataset dataset,
MetadataProvider metadataProvider)
+ throws AlgebricksException {
+ Index index = IndexUtil.getPrimaryIndex(dataset);
+ ARecordType itemType = (ARecordType)
metadataProvider.findType(dataset);
// get meta item type
ARecordType metaItemType = null;
if (dataset.hasMetaPart()) {
- metaItemType = (ARecordType)
metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
- dataset.getMetaItemTypeName());
+ metaItemType = (ARecordType) metadataProvider.findType(dataset);
}
JobSpecification spec =
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
- Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraint =
- metadataProvider.getSplitProviderAndConstraints(dataset);
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraint = metadataProvider
+ .getSplitProviderAndConstraints(dataset);
FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < fs.length; i++) {
sb.append(fs[i] + " ");
}
LOGGER.info("CREATING File Splits: " + sb.toString());
-
- Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
- DatasetUtil.getMergePolicyFactory(dataset,
metadataProvider.getMetadataTxnContext());
+ Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset,
+ metadataProvider.getMetadataTxnContext());
//prepare a LocalResourceMetadata which will be stored in NC's local
resource repository
IResourceFactory resourceFactory =
dataset.getResourceFactory(metadataProvider, index, itemType, metaItemType,
compactionInfo.first, compactionInfo.second);
- IndexBuilderFactory indexBuilderFactory =
- new
IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
- splitsAndConstraint.first, resourceFactory,
!dataset.isTemp());
+ IndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(
+
metadataProvider.getStorageComponentProvider().getStorageManager(),
splitsAndConstraint.first,
+ resourceFactory, !dataset.isTemp());
IndexCreateOperatorDescriptor indexCreateOp = new
IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
indexCreateOp,
splitsAndConstraint.second);
@@ -292,7 +306,7 @@
}
public static JobSpecification compactDatasetJobSpec(Dataverse dataverse,
String datasetName,
- MetadataProvider metadataProvider) throws AsterixException,
AlgebricksException {
+ MetadataProvider metadataProvider) throws AlgebricksException {
String dataverseName = dataverse.getDataverseName();
Dataset dataset = metadataProvider.findDataset(dataverseName,
datasetName);
if (dataset == null) {
@@ -313,6 +327,167 @@
return spec;
}
+ public static IOperatorDescriptor
createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider
metadataProvider,
+ Dataset dataset, JobId jobId) throws AlgebricksException {
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
primarySplitsAndConstraint = metadataProvider
+ .getSplitProviderAndConstraints(dataset);
+ IFileSplitProvider primaryFileSplitProvider =
primarySplitsAndConstraint.first;
+ AlgebricksPartitionConstraint primaryPartitionConstraint =
primarySplitsAndConstraint.second;
+ // -Infinity
+ int[] lowKeyFields = null;
+ // +Infinity
+ int[] highKeyFields = null;
+ ITransactionSubsystemProvider txnSubsystemProvider =
TransactionSubsystemProvider.INSTANCE;
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ ISearchOperationCallbackFactory searchCallbackFactory = temp ?
NoOpOperationCallbackFactory.INSTANCE
+ : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId,
dataset.getDatasetId(),
+ dataset.getPrimaryBloomFilterFields(),
txnSubsystemProvider,
+ IRecoveryManager.ResourceType.LSM_BTREE);
+ IndexDataflowHelperFactory indexHelperFactory = new
IndexDataflowHelperFactory(
+
metadataProvider.getStorageComponentProvider().getStorageManager(),
primaryFileSplitProvider);
+ BTreeSearchOperatorDescriptor primarySearchOp = new
BTreeSearchOperatorDescriptor(spec,
+ dataset.getPrimaryRecordDescriptor(metadataProvider),
lowKeyFields, highKeyFields, true, true,
+ indexHelperFactory, false, false, null, searchCallbackFactory,
null, null, false);
+
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
primarySearchOp,
+ primaryPartitionConstraint);
+ return primarySearchOp;
+ }
+
+ public static IOperatorDescriptor
createPrimaryIndexUpsertOp(JobSpecification spec,
+ MetadataProvider metadataProvider, Dataset source, Dataset target)
throws AlgebricksException {
+ int numKeys = source.getPrimaryKeys().size();
+ int numValues = source.hasMetaPart() ? 2 : 1;
+ int[] fieldPermutation = new int[numKeys + numValues];
+ for (int i = 0; i < fieldPermutation.length; ++i) {
+ fieldPermutation[i] = i;
+ }
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
upsertOpAndConstraints = createPrimaryIndexUpsertOp(
+ spec, metadataProvider, target,
source.getPrimaryRecordDescriptor(metadataProvider), fieldPermutation,
+ MissingWriterFactory.INSTANCE);
+ IOperatorDescriptor upsertOp = upsertOpAndConstraints.first;
+
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
upsertOp,
+ upsertOpAndConstraints.second);
+ return upsertOp;
+ }
+
+ public static IOperatorDescriptor createUpsertCommitOp(JobSpecification
spec, MetadataProvider metadataProvider,
+ JobId jobId, Dataset target) throws AlgebricksException {
+ int numKeys = target.getPrimaryKeys().size();
+ int[] primaryKeyFields = new int[numKeys];
+ for (int i = 0; i < numKeys; i++) {
+ primaryKeyFields[i] = i;
+ }
+ AlgebricksMetaOperatorDescriptor metaOp = new
AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+ new IPushRuntimeFactory[] {
+ target.getCommitRuntimeFactory(metadataProvider,
jobId, primaryKeyFields, true) },
+ new RecordDescriptor[] {
target.getPrimaryRecordDescriptor(metadataProvider) });
+ return metaOp;
+ }
+
+ public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
createPrimaryIndexUpsertOp(
+ JobSpecification spec, MetadataProvider metadataProvider, Dataset
dataset, RecordDescriptor inputRecordDesc,
+ int[] fieldPermutation, IMissingWriterFactory
missingWriterFactory) throws AlgebricksException {
+ int numKeys = dataset.getPrimaryKeys().size();
+ int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0
: 1;
+ ARecordType itemType = (ARecordType)
metadataProvider.findType(dataset);
+ ARecordType metaItemType = (ARecordType)
metadataProvider.findMetaType(dataset);
+ try {
+ Index primaryIndex =
metadataProvider.getIndex(dataset.getDataverseName(), dataset.getDatasetName(),
+ dataset.getDatasetName());
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraint = metadataProvider
+ .getSplitProviderAndConstraints(dataset);
+
+ // prepare callback
+ JobId jobId = ((JobEventListenerFactory)
spec.getJobletEventListenerFactory()).getJobId();
+ int[] primaryKeyFields = new int[numKeys];
+ for (int i = 0; i < numKeys; i++) {
+ primaryKeyFields[i] = i;
+ }
+ boolean hasSecondaries = metadataProvider
+ .getDatasetIndexes(dataset.getDataverseName(),
dataset.getDatasetName()).size() > 1;
+ IStorageComponentProvider storageComponentProvider =
metadataProvider.getStorageComponentProvider();
+ IModificationOperationCallbackFactory modificationCallbackFactory
= dataset.getModificationCallbackFactory(
+ storageComponentProvider, primaryIndex, jobId,
IndexOperation.UPSERT, primaryKeyFields);
+ ISearchOperationCallbackFactory searchCallbackFactory =
dataset.getSearchCallbackFactory(
+ storageComponentProvider, primaryIndex, jobId,
IndexOperation.UPSERT, primaryKeyFields);
+ IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
+ storageComponentProvider.getStorageManager(),
splitsAndConstraint.first);
+ LSMPrimaryUpsertOperatorDescriptor op;
+ ITypeTraits[] outputTypeTraits = new
ITypeTraits[inputRecordDesc.getFieldCount()
+ + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+ ISerializerDeserializer<?>[] outputSerDes = new
ISerializerDeserializer[inputRecordDesc.getFieldCount()
+ + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+
+ // add the previous record first
+ int f = 0;
+ outputSerDes[f] =
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+ f++;
+ // add the previous meta second
+ if (dataset.hasMetaPart()) {
+ outputSerDes[f] =
FormatUtils.getDefaultFormat().getSerdeProvider()
+ .getSerializerDeserializer(metaItemType);
+ outputTypeTraits[f] =
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+ f++;
+ }
+ // add the previous filter third
+ int fieldIdx = -1;
+ if (numFilterFields > 0) {
+ String filterField =
DatasetUtil.getFilterField(dataset).get(0);
+ String[] fieldNames = itemType.getFieldNames();
+ int i = 0;
+ for (; i < fieldNames.length; i++) {
+ if (fieldNames[i].equals(filterField)) {
+ break;
+ }
+ }
+ fieldIdx = i;
+ outputTypeTraits[f] =
FormatUtils.getDefaultFormat().getTypeTraitProvider()
+ .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+ outputSerDes[f] =
FormatUtils.getDefaultFormat().getSerdeProvider()
+
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+ f++;
+ }
+ for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
+ outputTypeTraits[j + f] = inputRecordDesc.getTypeTraits()[j];
+ outputSerDes[j + f] = inputRecordDesc.getFields()[j];
+ }
+ RecordDescriptor outputRecordDesc = new
RecordDescriptor(outputSerDes, outputTypeTraits);
+ op = new LSMPrimaryUpsertOperatorDescriptor(spec,
outputRecordDesc, fieldPermutation, idfh,
+ missingWriterFactory, modificationCallbackFactory,
searchCallbackFactory,
+ dataset.getFrameOpCallbackFactory(), numKeys, itemType,
fieldIdx, hasSecondaries);
+ return new Pair<>(op, splitsAndConstraint.second);
+ } catch (MetadataException me) {
+ throw new AlgebricksException(me);
+ }
+ }
+
+ public static AbstractOperatorDescriptor
createDummyKeyProviderOp(JobSpecification spec, Dataset dataset,
+ MetadataProvider metadataProvider) throws AlgebricksException {
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
primarySplitsAndConstraint = metadataProvider
+ .getSplitProviderAndConstraints(dataset);
+ AlgebricksPartitionConstraint primaryPartitionConstraint =
primarySplitsAndConstraint.second;
+
+ // Build dummy tuple containing one field with a dummy value inside.
+ ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+ DataOutput dos = tb.getDataOutput();
+ tb.reset();
+ try {
+ // Serialize dummy value into a field.
+ IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+ } catch (HyracksDataException e) {
+ throw new AsterixException(e);
+ }
+ // Add dummy field.
+ tb.addFieldEndOffset();
+ ISerializerDeserializer[] keyRecDescSers = {
IntegerSerializerDeserializer.INSTANCE };
+ RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+ ConstantTupleSourceOperatorDescriptor keyProviderOp = new
ConstantTupleSourceOperatorDescriptor(spec,
+ keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
tb.getSize());
+
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
keyProviderOp,
+ primaryPartitionConstraint);
+ return keyProviderOp;
+ }
+
public static boolean isFullyQualifiedName(String datasetName) {
return datasetName.indexOf('.') > 0; //NOSONAR a fully qualified name
can't start with a .
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 6d07fc7..4baeb55 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -21,6 +21,7 @@
import java.util.Collections;
import java.util.List;
+import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.config.OptimizationConfUtil;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -28,6 +29,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.om.types.ARecordType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -53,6 +55,13 @@
return secondaryFilterFields(dataset, index, filterTypeTraits);
}
+ public static Index getPrimaryIndex(Dataset dataset) {
+ InternalDatasetDetails id = (InternalDatasetDetails)
dataset.getDatasetDetails();
+ return new Index(dataset.getDataverseName(), dataset.getDatasetName(),
dataset.getDatasetName(),
+ DatasetConfig.IndexType.BTREE, id.getPartitioningKey(),
id.getKeySourceIndicator(),
+ id.getPrimaryKeyType(), false, true, dataset.getPendingOp());
+ }
+
public static int[] getBtreeFieldsIfFiltered(Dataset dataset, Index index)
throws AlgebricksException {
if (index.isPrimaryIndex()) {
return DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index 1f7914d..4ce4708 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -20,8 +20,9 @@
import java.util.List;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -29,7 +30,9 @@
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.runtime.utils.RuntimeUtils;
+import
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
@@ -46,12 +49,12 @@
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
@@ -128,13 +131,21 @@
return spec;
} else {
// Create dummy key provider for feeding the primary index scan.
- AbstractOperatorDescriptor keyProviderOp =
createDummyKeyProviderOp(spec);
+ AbstractOperatorDescriptor keyProviderOp =
DatasetUtil.createDummyKeyProviderOp(spec, dataset,
+ metadataProvider);
+ JobId jobId = JobIdFactory.generateJobId();
+ metadataProvider.setJobId(jobId);
+ boolean isWriteTransaction = metadataProvider.isWriteTransaction();
+ IJobletEventListenerFactory jobEventListenerFactory = new
JobEventListenerFactory(jobId,
+ isWriteTransaction);
+ spec.setJobletEventListenerFactory(jobEventListenerFactory);
// Create primary index scan op.
- BTreeSearchOperatorDescriptor primaryScanOp =
createPrimaryIndexScanOp(spec);
+ IOperatorDescriptor primaryScanOp =
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
+ jobId);
// Assign op.
- AbstractOperatorDescriptor sourceOp = primaryScanOp;
+ IOperatorDescriptor sourceOp = primaryScanOp;
if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
sourceOp = createCastOp(spec, dataset.getDatasetType());
spec.connect(new OneToOneConnectorDescriptor(spec),
primaryScanOp, 0, sourceOp, 0);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index fa519e1..f0ce889 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -19,19 +19,13 @@
package org.apache.asterix.metadata.utils;
-import java.io.DataOutput;
import java.util.List;
import java.util.Map;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.common.context.ITransactionSubsystemProvider;
-import org.apache.asterix.common.context.TransactionSubsystemProvider;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
-import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingConstants;
import
org.apache.asterix.external.operators.ExternalIndexBulkLoadOperatorDescriptor;
@@ -52,9 +46,6 @@
import org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor;
import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor;
import org.apache.asterix.runtime.evaluators.functions.NotDescriptor;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
-import
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -71,21 +62,11 @@
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IJobletEventListenerFactory;
import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import
org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
@SuppressWarnings("rawtypes")
@@ -262,59 +243,13 @@
primaryTypeTraits[numPrimaryKeys] =
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
if (dataset.hasMetaPart()) {
primaryRecFields[numPrimaryKeys + 1] = payloadSerde;
- primaryTypeTraits[numPrimaryKeys + 1] =
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+ primaryTypeTraits[numPrimaryKeys + 1] =
TypeTraitProvider.INSTANCE.getTypeTrait(metaType);
}
primaryRecDesc = new RecordDescriptor(primaryRecFields,
primaryTypeTraits);
}
protected abstract void setSecondaryRecDescAndComparators() throws
AlgebricksException;
- protected AbstractOperatorDescriptor
createDummyKeyProviderOp(JobSpecification spec) throws AlgebricksException {
- // Build dummy tuple containing one field with a dummy value inside.
- ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
- DataOutput dos = tb.getDataOutput();
- tb.reset();
- try {
- // Serialize dummy value into a field.
- IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
- } catch (HyracksDataException e) {
- throw new AsterixException(e);
- }
- // Add dummy field.
- tb.addFieldEndOffset();
- ISerializerDeserializer[] keyRecDescSers = {
IntegerSerializerDeserializer.INSTANCE };
- RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
- ConstantTupleSourceOperatorDescriptor keyProviderOp = new
ConstantTupleSourceOperatorDescriptor(spec,
- keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
tb.getSize());
-
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
keyProviderOp,
- primaryPartitionConstraint);
- return keyProviderOp;
- }
-
- protected BTreeSearchOperatorDescriptor
createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
- // -Infinity
- int[] lowKeyFields = null;
- // +Infinity
- int[] highKeyFields = null;
- ITransactionSubsystemProvider txnSubsystemProvider =
TransactionSubsystemProvider.INSTANCE;
- JobId jobId = JobIdFactory.generateJobId();
- metadataProvider.setJobId(jobId);
- boolean isWriteTransaction = metadataProvider.isWriteTransaction();
- IJobletEventListenerFactory jobEventListenerFactory = new
JobEventListenerFactory(jobId, isWriteTransaction);
- spec.setJobletEventListenerFactory(jobEventListenerFactory);
- boolean temp = dataset.getDatasetDetails().isTemp();
- ISearchOperationCallbackFactory searchCallbackFactory = temp ?
NoOpOperationCallbackFactory.INSTANCE
- : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId,
dataset.getDatasetId(),
- primaryBloomFilterKeyFields, txnSubsystemProvider,
ResourceType.LSM_BTREE);
- IndexDataflowHelperFactory indexHelperFactory = new
IndexDataflowHelperFactory(
-
metadataProvider.getStorageComponentProvider().getStorageManager(),
primaryFileSplitProvider);
- BTreeSearchOperatorDescriptor primarySearchOp =
- new BTreeSearchOperatorDescriptor(spec, primaryRecDesc,
lowKeyFields, highKeyFields, true, true,
- indexHelperFactory, false, false, null,
searchCallbackFactory, null, null, false);
-
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
primarySearchOp,
- primaryPartitionConstraint);
- return primarySearchOp;
- }
protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification
spec, int numSecondaryKeyFields,
RecordDescriptor secondaryRecDesc) throws AlgebricksException {
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
index 1bb1377..227435e 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
@@ -18,10 +18,11 @@
*/
package org.apache.asterix.metadata.utils;
-import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
@@ -30,7 +31,9 @@
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
import org.apache.asterix.om.utils.RecordUtil;
import org.apache.asterix.runtime.formats.FormatUtils;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.runtime.utils.RuntimeUtils;
+import
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -42,10 +45,12 @@
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
import
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.primitive.ShortPointable;
@@ -54,7 +59,6 @@
import
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
@@ -207,14 +211,21 @@
@Override
public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
JobSpecification spec =
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
+ JobId jobId = JobIdFactory.generateJobId();
+ metadataProvider.setJobId(jobId);
+ boolean isWriteTransaction = metadataProvider.isWriteTransaction();
+ IJobletEventListenerFactory jobEventListenerFactory = new
JobEventListenerFactory(jobId, isWriteTransaction);
+ spec.setJobletEventListenerFactory(jobEventListenerFactory);
// Create dummy key provider for feeding the primary index scan.
- AbstractOperatorDescriptor keyProviderOp =
createDummyKeyProviderOp(spec);
+ AbstractOperatorDescriptor keyProviderOp =
DatasetUtil.createDummyKeyProviderOp(spec, dataset,
+ metadataProvider);
// Create primary index scan op.
- BTreeSearchOperatorDescriptor primaryScanOp =
createPrimaryIndexScanOp(spec);
+ IOperatorDescriptor primaryScanOp =
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
+ jobId);
- AbstractOperatorDescriptor sourceOp = primaryScanOp;
+ IOperatorDescriptor sourceOp = primaryScanOp;
boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds();
int numSecondaryKeys = index.getKeyFieldNames().size();
if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
index 21fa754..ca47ccb 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
@@ -20,9 +20,10 @@
import java.util.List;
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
@@ -35,7 +36,9 @@
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import org.apache.asterix.runtime.utils.RuntimeUtils;
+import
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
@@ -48,12 +51,12 @@
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
@@ -202,13 +205,21 @@
metadataProvider.getStorageComponentProvider().getStorageManager(),
secondaryFileSplitProvider);
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
// Create dummy key provider for feeding the primary index scan.
- AbstractOperatorDescriptor keyProviderOp =
createDummyKeyProviderOp(spec);
+ AbstractOperatorDescriptor keyProviderOp =
DatasetUtil.createDummyKeyProviderOp(spec, dataset,
+ metadataProvider);
+ JobId jobId = JobIdFactory.generateJobId();
+ metadataProvider.setJobId(jobId);
+ boolean isWriteTransaction = metadataProvider.isWriteTransaction();
+ IJobletEventListenerFactory jobEventListenerFactory = new
JobEventListenerFactory(jobId,
+ isWriteTransaction);
+ spec.setJobletEventListenerFactory(jobEventListenerFactory);
// Create primary index scan op.
- BTreeSearchOperatorDescriptor primaryScanOp =
createPrimaryIndexScanOp(spec);
+ IOperatorDescriptor primaryScanOp =
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
+ jobId);
// Assign op.
- AbstractOperatorDescriptor sourceOp = primaryScanOp;
+ IOperatorDescriptor sourceOp = primaryScanOp;
if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
sourceOp = createCastOp(spec, dataset.getDatasetType());
spec.connect(new OneToOneConnectorDescriptor(spec),
primaryScanOp, 0, sourceOp, 0);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index db33dd5..36e9dc3 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -56,24 +56,23 @@
return splits.toArray(new FileSplit[] {});
}
- public static FileSplit[] getDatasetSplits(Dataset dataset,
MetadataTransactionContext mdTxnCtx,
- String targetIdxName, boolean temp) throws AlgebricksException {
+ public static FileSplit[] getIndexSplits(Dataset dataset, String
indexName, MetadataTransactionContext mdTxnCtx)
+ throws AlgebricksException {
try {
List<String> nodeGroup =
MetadataManager.INSTANCE.getNodegroup(mdTxnCtx,
dataset.getNodeGroupName()).getNodeNames();
if (nodeGroup == null) {
throw new AlgebricksException("Couldn't find node group " +
dataset.getNodeGroupName());
}
- return getDatasetSplits(dataset, nodeGroup, targetIdxName, temp);
+ return getIndexSplits(dataset, indexName, nodeGroup);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
}
- public static FileSplit[] getDatasetSplits(Dataset dataset, List<String>
nodes, String targetIdxName,
- boolean temp) {
+ public static FileSplit[] getIndexSplits(Dataset dataset, String
indexName, List<String> nodes) {
File relPathFile = new
File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
- dataset.getDatasetName(), targetIdxName));
+ dataset.getDatasetName(), indexName,
dataset.getRebalanceId()));
String storageDirName =
ClusterProperties.INSTANCE.getStorageDirectoryName();
List<FileSplit> splits = new ArrayList<>();
for (String nd : nodes) {
@@ -88,7 +87,8 @@
// format: 'storage dir
name'/partition_#/dataverse/dataset_idx_index
File f = new File(
StoragePathUtil.prepareStoragePartitionPath(storageDirName,
nodePartitions[k].getPartitionId())
- + (temp ? (File.separator +
StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER) : "")
+ + (dataset.isTemp() ? (File.separator +
StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER)
+ : "")
+ File.separator + relPathFile);
splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[k],
f.getPath()));
}
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryHashFunctionFactoryProvider.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryHashFunctionFactoryProvider.java
index cea0369..6365860 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryHashFunctionFactoryProvider.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryHashFunctionFactoryProvider.java
@@ -25,28 +25,14 @@
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import
org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import org.apache.hyracks.data.std.primitive.DoublePointable;
-import org.apache.hyracks.data.std.primitive.FloatPointable;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.data.std.primitive.UTF8StringLowercasePointable;
import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
public class BinaryHashFunctionFactoryProvider implements
IBinaryHashFunctionFactoryProvider, Serializable {
private static final long serialVersionUID = 1L;
public static final BinaryHashFunctionFactoryProvider INSTANCE = new
BinaryHashFunctionFactoryProvider();
- public static final PointableBinaryHashFunctionFactory
INTEGER_POINTABLE_INSTANCE =
- new PointableBinaryHashFunctionFactory(IntegerPointable.FACTORY);
- public static final PointableBinaryHashFunctionFactory
FLOAT_POINTABLE_INSTANCE =
- new PointableBinaryHashFunctionFactory(FloatPointable.FACTORY);
- public static final PointableBinaryHashFunctionFactory
DOUBLE_POINTABLE_INSTANCE =
- new PointableBinaryHashFunctionFactory(DoublePointable.FACTORY);
public static final PointableBinaryHashFunctionFactory
UTF8STRING_POINTABLE_INSTANCE =
new
PointableBinaryHashFunctionFactory(UTF8StringPointable.FACTORY);
- // Equivalent to UTF8STRING_POINTABLE_INSTANCE but all characters are
considered lower case to implement
- // case-insensitive hashing.
- public static final PointableBinaryHashFunctionFactory
UTF8STRING_LOWERCASE_POINTABLE_INSTANCE =
- new
PointableBinaryHashFunctionFactory(UTF8StringLowercasePointable.FACTORY);
private BinaryHashFunctionFactoryProvider() {
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 44aaef5..c73ff46 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -222,7 +222,7 @@
createReplicationJob(ReplicationOperation.DELETE,
resourceFile);
}
} else {
- throw new HyracksDataException("Resource doesn't exist");
+ throw new HyracksDataException("Resource doesn't exist " +
relativePath);
}
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/DatasetIdFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/DatasetIdFactory.java
index 324b7fd..cf2a593 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/DatasetIdFactory.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/DatasetIdFactory.java
@@ -23,6 +23,7 @@
public class DatasetIdFactory {
private static AtomicInteger id = new AtomicInteger();
private static boolean isInitialized = false;
+ private static int startId = 0;
public static boolean isInitialized() {
return isInitialized;
@@ -30,13 +31,19 @@
public static void initialize(int initialId) {
id.set(initialId);
+ startId = initialId;
isInitialized = true;
}
public static int generateDatasetId() {
+ id.compareAndSet(Integer.MAX_VALUE, startId);
return id.incrementAndGet();
}
+ public static int generateAlternatingDatasetId(int originalId) {
+ return originalId ^ 0x8000;
+ }
+
public static int getMostRecentDatasetId() {
return id.get();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/constraints/PartitionConstraintHelper.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/constraints/PartitionConstraintHelper.java
index a057f40..95f5e1a 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/constraints/PartitionConstraintHelper.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/constraints/PartitionConstraintHelper.java
@@ -30,15 +30,8 @@
count)));
}
- public static void addLocationChoiceConstraint(JobSpecification spec,
IOperatorDescriptor op, String[][] choices) {
- addPartitionCountConstraint(spec, op, choices.length);
- for (int i = 0; i < choices.length; ++i) {
- spec.addUserConstraint(new Constraint(new
PartitionLocationExpression(op.getOperatorId(), i),
- new ConstantExpression(choices[i])));
- }
- }
-
- public static void addAbsoluteLocationConstraint(JobSpecification spec,
IOperatorDescriptor op, String... locations) {
+ public static void addAbsoluteLocationConstraint(JobSpecification spec,
IOperatorDescriptor op,
+ String... locations) {
addPartitionCountConstraint(spec, op, locations.length);
for (int i = 0; i < locations.length; ++i) {
spec.addUserConstraint(new Constraint(new
PartitionLocationExpression(op.getOperatorId(), i),
--
To view, visit https://asterix-gerrit.ics.uci.edu/1768
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibda35252031fc4940972f0f19bbf796cadfa53d6
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>