Yingyi Bu has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1768

Change subject: WIP: Add a dataset relocation REST API.
......................................................................

WIP: Add a dataset relocation REST API.

Change-Id: Ibda35252031fc4940972f0f19bbf796cadfa53d6
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
A 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceTest.java
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.1.ddl.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.2.update.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.3.query.sqlpp
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.4.post.http
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.5.get.http
A 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.6.query.sqlpp
A asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm
A 
asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryHashFunctionFactoryProvider.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/DatasetIdFactory.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/constraints/PartitionConstraintHelper.java
41 files changed, 1,023 insertions(+), 306 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/68/1768/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index 20c69c4..2dd4c3d 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -88,15 +88,8 @@
         int[] primaryKeyFields = 
JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]);
 
         //get dataset splits
-        FileSplit[] splitsForDataset = 
metadataProvider.splitsForDataset(metadataProvider.getMetadataTxnContext(),
-                dataset.getDataverseName(), dataset.getDatasetName(), 
dataset.getDatasetName(),
-                metadataProvider.isTemporaryDatasetWriteJob());
-        int[] datasetPartitions = new int[splitsForDataset.length];
-        for (int i = 0; i < splitsForDataset.length; i++) {
-            datasetPartitions[i] = i;
-        }
-        IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(jobId, 
primaryKeyFields, metadataProvider,
-                datasetPartitions, isSink);
+        IPushRuntimeFactory runtime = 
dataset.getCommitRuntimeFactory(metadataProvider, jobId, primaryKeyFields,
+                isSink);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
index 421ee0e..396665e 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ConnectorApiServlet.java
@@ -105,8 +105,7 @@
                     return;
                 }
                 boolean temp = dataset.getDatasetDetails().isTemp();
-                FileSplit[] fileSplits =
-                        metadataProvider.splitsForDataset(mdTxnCtx, 
dataverseName, datasetName, datasetName, temp);
+                FileSplit[] fileSplits = 
metadataProvider.splitsForIndex(mdTxnCtx, dataset, datasetName);
                 ARecordType recordType = (ARecordType) 
metadataProvider.findType(dataset.getItemTypeDataverseName(),
                         dataset.getItemTypeName());
                 List<List<String>> primaryKeys = dataset.getPrimaryKeys();
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
new file mode 100644
index 0000000..56a40bc
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java
@@ -0,0 +1,101 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.asterix.api.http.server;
+
+import static 
org.apache.asterix.api.http.servlet.ServletConstants.HYRACKS_CONNECTION_ATTR;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentMap;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.file.StorageComponentProvider;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.utils.RebalanceUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.http.api.IServletRequest;
+import org.apache.hyracks.http.api.IServletResponse;
+import org.apache.hyracks.http.server.AbstractServlet;
+import org.apache.hyracks.http.server.utils.HttpUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+public class RebalanceApiServlet extends AbstractServlet {
+    private static final Logger LOGGER = 
Logger.getLogger(RebalanceApiServlet.class.getName());
+    private ICcApplicationContext appCtx;
+
+    public RebalanceApiServlet(ConcurrentMap<String, Object> ctx, String[] 
paths, ICcApplicationContext appCtx) {
+        super(ctx, paths);
+        this.appCtx = appCtx;
+    }
+
+    @Override
+    protected void post(IServletRequest request, IServletResponse response) {
+        response.setStatus(HttpResponseStatus.OK);
+        try {
+            HttpUtil.setContentType(response, HttpUtil.ContentType.TEXT_HTML, 
HttpUtil.Encoding.UTF8);
+        } catch (IOException e) {
+            LOGGER.log(Level.WARNING, "Failure setting content type", e);
+            response.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
+            return;
+        }
+        PrintWriter out = response.writer();
+        ObjectMapper om = new ObjectMapper();
+        ObjectNode jsonResponse = om.createObjectNode();
+        try {
+            // Gets dataverse, dataset, and target nodes for rebalance.
+            String dataverseName = request.getParameter("dataverseName");
+            String datasetName = request.getParameter("datasetName");
+            if (dataverseName == null || datasetName == null) {
+                jsonResponse.put("error", "Parameter dataverseName or 
datasetName is null,");
+                sendResult(out, jsonResponse);
+                return;
+            }
+            String[] nodes = StringUtils.strip(request.getParameter("nodes"), 
"\"'").split(",");
+
+            // Rebalances a given dataset from its current locations to the 
target nodes.
+            IHyracksClientConnection hcc = (IHyracksClientConnection) 
ctx.get(HYRACKS_CONNECTION_ATTR);
+            MetadataProvider metadataProvider = new MetadataProvider(appCtx, 
null, new StorageComponentProvider());
+            RebalanceUtil.rebalance(dataverseName, datasetName, 
Arrays.asList(nodes), metadataProvider, hcc);
+
+            // Writes file splits.
+            jsonResponse.put("results", "successful");
+            sendResult(out, jsonResponse);
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, "Failure handling a request", e);
+            jsonResponse.put("results", e.getMessage());
+            sendResult(out, jsonResponse);
+            e.printStackTrace(out);
+        }
+    }
+
+    private void sendResult(PrintWriter out, ObjectNode jsonResponse) {
+        out.write(jsonResponse.toString());
+        out.flush();
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 6a2b4e0..a5c4a90 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -612,9 +612,7 @@
                     MetadataUtil.PENDING_ADD_OP);
             
MetadataManager.INSTANCE.addDataset(metadataProvider.getMetadataTxnContext(), 
dataset);
             if (dd.getDatasetType() == DatasetType.INTERNAL) {
-                Dataverse dataverse =
-                        
MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), 
dataverseName);
-                JobSpecification jobSpec = 
DatasetUtil.createDatasetJobSpec(dataverse, datasetName, metadataProvider);
+                JobSpecification jobSpec = 
DatasetUtil.createDatasetJobSpec(dataset, metadataProvider);
 
                 // #. make metadataTxn commit before calling runJob.
                 MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2429,7 +2427,6 @@
     }
 
     protected void handleCreateNodeGroupStatement(MetadataProvider 
metadataProvider, Statement stmt) throws Exception {
-
         NodegroupDecl stmtCreateNodegroup = (NodegroupDecl) stmt;
         String ngName = stmtCreateNodegroup.getNodegroupName().getValue();
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 57cc340..61d93f5 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -46,6 +46,7 @@
 import org.apache.asterix.api.http.server.QueryServiceServlet;
 import org.apache.asterix.api.http.server.QueryStatusApiServlet;
 import org.apache.asterix.api.http.server.QueryWebInterfaceServlet;
+import org.apache.asterix.api.http.server.RebalanceApiServlet;
 import org.apache.asterix.api.http.server.ShutdownApiServlet;
 import org.apache.asterix.api.http.server.UpdateApiServlet;
 import org.apache.asterix.api.http.server.VersionApiServlet;
@@ -226,6 +227,7 @@
         addServlet(jsonAPIServer, Servlets.SHUTDOWN);
         addServlet(jsonAPIServer, Servlets.VERSION);
         addServlet(jsonAPIServer, Servlets.CLUSTER_STATE);
+        addServlet(jsonAPIServer, Servlets.REBALANCE);
         addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_NODE_DETAIL); // must 
not precede add of CLUSTER_STATE
         addServlet(jsonAPIServer, Servlets.CLUSTER_STATE_CC_DETAIL); // must 
not precede add of CLUSTER_STATE
         addServlet(jsonAPIServer, Servlets.DIAGNOSTICS);
@@ -283,6 +285,8 @@
                         componentProvider);
             case Servlets.CONNECTOR:
                 return new ConnectorApiServlet(ctx, paths, appCtx);
+            case Servlets.REBALANCE:
+                return new RebalanceApiServlet(ctx, paths, appCtx);
             case Servlets.SHUTDOWN:
                 return new ShutdownApiServlet(ctx, paths);
             case Servlets.VERSION:
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
index 039933a..b937137 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
@@ -43,4 +43,5 @@
         jobSpec.addRoot(frod);
         return jobSpec;
     }
+
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
new file mode 100644
index 0000000..afc2709
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -0,0 +1,204 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.asterix.utils;
+
+import static org.apache.asterix.app.translator.QueryTranslator.abort;
+
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.utils.JobUtils;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.NodeGroup;
+import org.apache.asterix.metadata.lock.LockList;
+import org.apache.asterix.metadata.lock.MetadataLockManager;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import 
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import 
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
+import 
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+
+public class RebalanceUtil {
+
+    private RebalanceUtil() {
+
+    }
+
+    public static void rebalance(String dataverseName, String datasetName, 
List<String> targetNcNames,
+            MetadataProvider metadataProvider, IHyracksClientConnection hcc) 
throws Exception {
+        // The source dataset.
+        Dataset sourceDataset = getSourceDataset(dataverseName, datasetName, 
metadataProvider);
+
+        // The new node group name.
+        String nodeGroupName = datasetName + "_" + 
(sourceDataset.getRebalanceId() + 1);
+
+        // Creates a node group for rebalance.
+        createNodeGroup(nodeGroupName, targetNcNames, metadataProvider);
+
+        // The target dataset for rebalance.
+        Dataset targetDataset = new Dataset(sourceDataset, true, 
nodeGroupName);
+
+        // Rebalances the source dataset into the target dataset.
+        rebalance(sourceDataset, targetDataset, metadataProvider, hcc);
+    }
+
+    private static Dataset getSourceDataset(String dataverseName, String 
datasetName, MetadataProvider metadataProvider)
+            throws ACIDException, RemoteException, AlgebricksException {
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        Dataset sourceDataset = null;
+        try {
+            sourceDataset = metadataProvider.findDataset(dataverseName, 
datasetName);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            abort(e, e, mdTxnCtx);
+            throw e;
+        } finally {
+            metadataProvider.getLocks().unlock();
+            metadataProvider.getLocks().reset();
+        }
+        return sourceDataset;
+    }
+
+    // Creates a node group for the rebalance target dataset.
+    private static void createNodeGroup(String ngName, List<String> ncNames, 
MetadataProvider metadataProvider)
+            throws Exception {
+        MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        
MetadataLockManager.INSTANCE.acquireNodeGroupWriteLock(metadataProvider.getLocks(),
 ngName);
+        try {
+            NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, 
ngName);
+            if (ng == null) {
+                MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, new 
NodeGroup(ngName, ncNames));
+            }
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } catch (Exception e) {
+            abort(e, e, mdTxnCtx);
+            throw e;
+        } finally {
+            metadataProvider.getLocks().unlock();
+            metadataProvider.getLocks().reset();
+        }
+    }
+
+    private static void rebalance(Dataset source, Dataset target, 
MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc) throws Exception {
+        // Creates the rebalance target.
+        createRebalanceTarget(target, metadataProvider, hcc);
+
+        // Populates the data from the rebalance source to the rebalance 
target.
+        populateDataToRebalanceTarget(source, target, metadataProvider, hcc);
+
+        // Atomically switches the rebalance target to become the source 
dataset.
+        rebalanceSwitch(source, target, metadataProvider, hcc);
+    }
+
+    private static void rebalanceSwitch(Dataset source, Dataset target, 
MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc) throws Exception {
+        MutableObject<JobUtils.ProgressState> progress = new 
MutableObject<>(JobUtils.ProgressState.NO_PROGRESS);
+        MutableObject<MetadataTransactionContext> mdTxnCtx = new 
MutableObject<>(
+                MetadataManager.INSTANCE.beginTransaction());
+        writeLockDataset(metadataProvider.getLocks(), source);
+        try {
+            MutableBoolean bActiveTxn = new MutableBoolean(true);
+            metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
+            List<JobSpecification> jobsToExecute = new ArrayList<>();
+            // Drops the original dataset's metadata entry
+            source.drop(metadataProvider, mdTxnCtx, jobsToExecute, bActiveTxn, 
progress, hcc);
+            // Adds a new rebalanced dataset entry in the metadata storage
+            MetadataManager.INSTANCE.addDataset(mdTxnCtx.getValue(), target);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx.getValue());
+        } catch (Exception e) {
+            abort(e, e, mdTxnCtx.getValue());
+            throw e;
+        } finally {
+            metadataProvider.getLocks().unlock();
+            metadataProvider.getLocks().reset();
+        }
+    }
+
+    private static void createRebalanceTarget(Dataset target, MetadataProvider 
metadataProvider,
+            IHyracksClientConnection hcc) throws Exception {
+        // Creates the files for the rebalance target.
+        JobSpecification spec = DatasetUtil.createDatasetJobSpec(target, 
metadataProvider);
+        JobUtils.runJob(hcc, spec, true);
+    }
+
+    private static void populateDataToRebalanceTarget(Dataset source, Dataset 
target, MetadataProvider metadataProvider,
+            IHyracksClientConnection hcc) throws Exception {
+        JobSpecification spec = new JobSpecification();
+        JobId jobId = JobIdFactory.generateJobId();
+        JobEventListenerFactory jobEventListenerFactory = new 
JobEventListenerFactory(jobId, true);
+        spec.setJobletEventListenerFactory(jobEventListenerFactory);
+
+        // The pipeline starter.
+        IOperatorDescriptor ets = DatasetUtil.createDummyKeyProviderOp(spec, 
source, metadataProvider);
+
+        // Creates primary index scan op.
+        IOperatorDescriptor primaryScanOp = 
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source, jobId);
+
+        // Creates secondary BTree upsert op.
+        IOperatorDescriptor upsertOp = 
DatasetUtil.createPrimaryIndexUpsertOp(spec, metadataProvider, source, target);
+
+        // The final commit operator.
+        IOperatorDescriptor commitOp = DatasetUtil.createUpsertCommitOp(spec, 
metadataProvider, jobId, target);
+
+        // Connects empty-tuple-source and scan.
+        spec.connect(new OneToOneConnectorDescriptor(spec), ets, 0, 
primaryScanOp, 0);
+
+        // Connects scan and upsert.
+        int numKeys = target.getPrimaryKeys().size();
+        int[] keys = new int[numKeys];
+        for (int i = 0; i < numKeys; ++i) {
+            keys[i] = i;
+        }
+        IConnectorDescriptor connectorDescriptor = new 
MToNPartitioningConnectorDescriptor(spec,
+                new FieldHashPartitionComputerFactory(keys, 
target.getPrimaryHashFunctionFactories(metadataProvider)));
+        spec.connect(connectorDescriptor, primaryScanOp, 0, upsertOp, 0);
+
+        // Connects upsert and sink.
+        spec.connect(new OneToOneConnectorDescriptor(spec), upsertOp, 0, 
commitOp, 0);
+
+        // Executes the job.
+        JobUtils.runJob(hcc, spec, true);
+    }
+
+    private static void writeLockDataset(LockList locks, Dataset dataset) {
+        MetadataLockManager.INSTANCE.acquireDataverseReadLock(locks, 
dataset.getDataverseName());
+        MetadataLockManager.INSTANCE.acquireDatasetWriteLock(locks,
+                dataset.getDataverseName() + "." + dataset.getDatasetName());
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 856638f..cbe551b 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -406,7 +406,7 @@
                     IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators, 
keyFieldTypes, false, true,
                     MetadataUtil.PENDING_NO_OP);
             List<String> nodes = 
Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId());
-            FileSplit[] splits = 
SplitsAndConstraintsUtil.getDatasetSplits(dataset, nodes, index.getIndexName(), 
false);
+            FileSplit[] splits = 
SplitsAndConstraintsUtil.getIndexSplits(dataset, index.getIndexName(), nodes);
             fileSplitProvider = new 
ConstantFileSplitProvider(Arrays.copyOfRange(splits, 0, 1));
         }
 
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceTest.java
new file mode 100644
index 0000000..778e4ee
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/RebalanceTest.java
@@ -0,0 +1,66 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *   http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.asterix.test.runtime;
+
+import java.util.Collection;
+
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Runs the SQL++ runtime tests with the storage parallelism.
+ */
+@RunWith(Parameterized.class)
+public class RebalanceTest {
+    protected static final String TEST_CONFIG_FILE_NAME = 
"asterix-build-configuration.xml";
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, new TestExecutor());
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+    }
+
+    @Parameters(name = "RebalanceTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests("only_sqlpp.xml", "rebalance.xml");
+    }
+
+    protected TestCaseContext tcCtx;
+
+    public RebalanceTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @Test
+    public void test() throws Exception {
+        LangExecutionUtil.test(tcCtx);
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.1.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.1.ddl.sqlpp
new file mode 100644
index 0000000..92698ab
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.1.ddl.sqlpp
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop  dataverse tpch if exists;
+create  dataverse tpch;
+
+use tpch;
+
+
+create type tpch.LineItemType as
+ closed {
+  l_orderkey : bigint,
+  l_partkey : bigint,
+  l_suppkey : bigint,
+  l_linenumber : bigint,
+  l_quantity : double,
+  l_extendedprice : double,
+  l_discount : double,
+  l_tax : double,
+  l_returnflag : string,
+  l_linestatus : string,
+  l_shipdate : string,
+  l_commitdate : string,
+  l_receiptdate : string,
+  l_shipinstruct : string,
+  l_shipmode : string,
+  l_comment : string
+}
+
+create  dataset LineItem(LineItemType) primary key l_orderkey,l_linenumber;
+
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.2.update.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.2.update.sqlpp
new file mode 100644
index 0000000..8a59946
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.2.update.sqlpp
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+load  dataset LineItem using localfs 
((`path`=`asterix_nc1://data/tpch0.001/lineitem.tbl`),
+      (`format`=`delimited-text`),(`delimiter`=`|`));
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.3.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.3.query.sqlpp
new file mode 100644
index 0000000..ca42f3c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.3.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+select value count(*) from LineItem;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.4.post.http
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.4.post.http
new file mode 100644
index 0000000..f5a2a75
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.4.post.http
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/rebalance?dataverseName=tpch&datasetName=LineItem&nodes="asterix_nc1"
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.5.get.http
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.5.get.http
new file mode 100644
index 0000000..360a01f
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.5.get.http
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/connector?dataverseName=tpch&datasetName=LineItem
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.6.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.6.query.sqlpp
new file mode 100644
index 0000000..ca42f3c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/rebalance/single_dataset/single_dataset.6.query.sqlpp
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use tpch;
+
+select value count(*) from LineItem;
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml
new file mode 100644
index 0000000..d380494
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/rebalance.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ /*
+  ~  * Licensed to the Apache Software Foundation (ASF) under one
+  ~  * or more contributor license agreements.  See the NOTICE file
+  ~  * distributed with this work for additional information
+  ~  * regarding copyright ownership.  The ASF licenses this file
+  ~  * to you under the Apache License, Version 2.0 (the
+  ~  * "License"); you may not use this file except in compliance
+  ~  * with the License.  You may obtain a copy of the License at
+  ~  *
+  ~  *   http://www.apache.org/licenses/LICENSE-2.0
+  ~  *
+  ~  * Unless required by applicable law or agreed to in writing,
+  ~  * software distributed under the License is distributed on an
+  ~  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~  * KIND, either express or implied.  See the License for the
+  ~  * specific language governing permissions and limitations
+  ~  * under the License.
+  ~  */
+  -->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" 
ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp">
+  <test-group name="rebalance">
+    <test-case FilePath="rebalance">
+      <compilation-unit name="single_dataset">
+        <output-dir compare="Text">single_dataset</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
+</test-suite>
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm
new file mode 100644
index 0000000..3a8dfe0
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.3.adm
@@ -0,0 +1 @@
+6005
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm
new file mode 100644
index 0000000..4b9eb7d
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.4.adm
@@ -0,0 +1 @@
+{"results":"successful"}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm
new file mode 100644
index 0000000..4f0990e
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.5.adm
@@ -0,0 +1 @@
+{"temp":false,"keys":"l_orderkey,l_linenumber","type":{"type":"org.apache.asterix.om.types.ARecordType","name":"LineItemType","open":false,"fields":[{"l_orderkey":{"type":"AInt64"}},{"l_partkey":{"type":"AInt64"}},{"l_suppkey":{"type":"AInt64"}},{"l_linenumber":{"type":"AInt64"}},{"l_quantity":{"type":"ADouble"}},{"l_extendedprice":{"type":"ADouble"}},{"l_discount":{"type":"ADouble"}},{"l_tax":{"type":"ADouble"}},{"l_returnflag":{"type":"AString"}},{"l_linestatus":{"type":"AString"}},{"l_shipdate":{"type":"AString"}},{"l_commitdate":{"type":"AString"}},{"l_receiptdate":{"type":"AString"}},{"l_shipinstruct":{"type":"AString"}},{"l_shipmode":{"type":"AString"}},{"l_comment":{"type":"AString"}}]},"splits":[{"ip":"127.0.0.1","path":"storage/partition_0/tpch/1/LineItem_idx_LineItem"},{"ip":"127.0.0.1","path":"storage/partition_1/tpch/1/LineItem_idx_LineItem"}]}
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm
new file mode 100644
index 0000000..3a8dfe0
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/rebalance/single_dataset/single_dataset.6.adm
@@ -0,0 +1 @@
+6005
\ No newline at end of file
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index 3047ef5..f11461b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -32,6 +32,7 @@
     public static final String QUERY_RESULT = "/query/service/result/*";
     public static final String QUERY_SERVICE = "/query/service";
     public static final String CONNECTOR = "/connector";
+    public static final String REBALANCE = "/rebalance";
     public static final String SHUTDOWN = "/admin/shutdown";
     public static final String VERSION = "/admin/version";
     public static final String RUNNING_REQUESTS = "/admin/requests/running/*";
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 1aa1474..d49487d 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -37,7 +37,6 @@
     public static final String PARTITION_DIR_PREFIX = "partition_";
     public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
     public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
-    public static final String ADAPTER_INSTANCE_PREFIX = "adapter_";
 
     private StoragePathUtil() {
     }
@@ -61,16 +60,18 @@
         return storageDirName + File.separator + 
StoragePathUtil.PARTITION_DIR_PREFIX + partitonId;
     }
 
-    public static String prepareDataverseIndexName(String dataverseName, 
String datasetName, String idxName) {
-        return prepareDataverseIndexName(dataverseName, 
prepareFullIndexName(datasetName, idxName));
+    public static String prepareDataverseIndexName(String dataverseName, 
String datasetName, String idxName,
+            long rebalanceId) {
+        return prepareDataverseIndexName(dataverseName, 
prepareFullIndexName(datasetName, idxName, rebalanceId));
     }
 
     public static String prepareDataverseIndexName(String dataverseName, 
String fullIndexName) {
         return dataverseName + File.separator + fullIndexName;
     }
 
-    private static String prepareFullIndexName(String datasetName, String 
idxName) {
-        return datasetName + DATASET_INDEX_NAME_SEPARATOR + idxName;
+    private static String prepareFullIndexName(String datasetName, String 
idxName, long rebalanceId) {
+        return (rebalanceId == 0 ? "" : rebalanceId + File.separator) + 
datasetName + DATASET_INDEX_NAME_SEPARATOR
+                + idxName;
     }
 
     public static int getPartitionNumFromName(String name) {
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
index 01ade10..c84a5bd 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java
@@ -26,7 +26,6 @@
 import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.metadata.api.IMetadataEntity;
 import org.apache.asterix.metadata.entities.CompactionPolicy;
@@ -39,9 +38,9 @@
 import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.Function;
 import org.apache.asterix.metadata.entities.Index;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.metadata.entities.Library;
 import org.apache.asterix.metadata.entities.NodeGroup;
+import org.apache.asterix.metadata.utils.IndexUtil;
 
 /**
  * Caches metadata entities such that the MetadataManager does not have to
@@ -161,10 +160,7 @@
                 // Add the primary index associated with the dataset, if the 
dataset is an
                 // internal dataset.
                 if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-                    InternalDatasetDetails id = (InternalDatasetDetails) 
dataset.getDatasetDetails();
-                    Index index = new Index(dataset.getDataverseName(), 
dataset.getDatasetName(),
-                            dataset.getDatasetName(), IndexType.BTREE, 
id.getPartitioningKey(),
-                            id.getKeySourceIndicator(), 
id.getPrimaryKeyType(), false, true, dataset.getPendingOp());
+                    Index index = IndexUtil.getPrimaryIndex(dataset);
                     addIndexIfNotExistsInternal(index);
                 }
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 272cced..673a5ae 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -335,10 +335,6 @@
         String resourceName = metadataPartitionPath + File.separator + 
index.getFileNameRelativePath();
         FileReference file = ioManager.getFileReference(metadataDeviceId, 
resourceName);
         index.setFile(file);
-        // this should not be done this way. dataset lifecycle manager 
shouldn't return virtual buffer caches for
-        // a dataset that was not yet created
-        List<IVirtualBufferCache> virtualBufferCaches = 
appContext.getDatasetLifecycleManager()
-                .getVirtualBufferCaches(index.getDatasetId().getId(), 
metadataPartition.getIODeviceNum());
         ITypeTraits[] typeTraits = index.getTypeTraits();
         IBinaryComparatorFactory[] cmpFactories = 
index.getKeyBinaryComparatorFactory();
         int[] bloomFilterKeyFields = index.getBloomFilterKeyFields();
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
index ee5c9f3..b259744 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataIndex.java
@@ -19,7 +19,6 @@
 
 package org.apache.asterix.metadata.bootstrap;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -27,6 +26,7 @@
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ImmutableDatasetId;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -232,7 +232,9 @@
 
     @Override
     public String getFileNameRelativePath() {
-        return getDataverseName() + File.separator + getIndexedDatasetName() + 
"_idx_" + getIndexName();
+        // The rebalance Id for metadata dataset is always 0.
+        return StoragePathUtil.prepareDataverseIndexName(getDataverseName(), 
getIndexedDatasetName(), getIndexName(),
+                0);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 69f4e03..b094801 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -82,7 +82,6 @@
 import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
 import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor;
 import 
org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
@@ -306,6 +305,14 @@
         return MetadataManagerUtil.findType(mdTxnCtx, dataverse, typeName);
     }
 
+    public IAType findType(Dataset dataset) throws AlgebricksException {
+        return findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName());
+    }
+
+    public IAType findMetaType(Dataset dataset) throws AlgebricksException {
+        return findType(dataset.getMetaItemTypeDataverseName(), 
dataset.getMetaItemTypeName());
+    }
+
     public Feed findFeed(String dataverse, String feedName) throws 
AlgebricksException {
         return MetadataManagerUtil.findFeed(mdTxnCtx, dataverse, feedName);
     }
@@ -379,17 +386,6 @@
             throw new AlgebricksException(e);
         }
         return new Pair<>(dataScanner, constraint);
-    }
-
-    public IDataFormat getDataFormat(String dataverseName) throws 
CompilationException {
-        Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(mdTxnCtx, 
dataverseName);
-        IDataFormat format;
-        try {
-            format = (IDataFormat) 
Class.forName(dataverse.getDataFormat()).newInstance();
-        } catch (Exception e) {
-            throw new CompilationException(e);
-        }
-        return format;
     }
 
     public Dataverse findDataverse(String dataverseName) throws 
CompilationException {
@@ -760,10 +756,9 @@
         return 
SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(dataverse);
     }
 
-    public FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, 
String dataverseName, String datasetName,
-            String targetIdxName, boolean temp) throws AlgebricksException {
-        return 
SplitsAndConstraintsUtil.getDatasetSplits(findDataset(dataverseName, 
datasetName), mdTxnCtx,
-                targetIdxName, temp);
+    public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, 
Dataset dataset, String indexName)
+            throws AlgebricksException {
+        return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, 
mdTxnCtx);
     }
 
     public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, 
String dataverseName, String adapterName)
@@ -860,79 +855,10 @@
                 fieldPermutation[i++] = idx;
             }
         }
-        try {
-            Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
-                    dataset.getDatasetName(), dataset.getDatasetName());
-            String itemTypeName = dataset.getItemTypeName();
-            String itemTypeDataverseName = dataset.getItemTypeDataverseName();
-            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, itemTypeDataverseName, 
itemTypeName).getDatatype();
-            ARecordType metaItemType = DatasetUtil.getMetaType(this, dataset);
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataset);
-            // prepare callback
-            JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
-            int[] primaryKeyFields = new int[numKeys];
-            for (i = 0; i < numKeys; i++) {
-                primaryKeyFields[i] = i;
-            }
-
-            boolean hasSecondaries = MetadataManager.INSTANCE
-                    .getDatasetIndexes(mdTxnCtx, dataset.getDataverseName(), 
dataset.getDatasetName()).size() > 1;
-
-            IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
-                    storaegComponentProvider, primaryIndex, jobId, 
IndexOperation.UPSERT, primaryKeyFields);
-            ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
-                    storaegComponentProvider, primaryIndex, jobId, 
IndexOperation.UPSERT, primaryKeyFields);
-            IIndexDataflowHelperFactory idfh =
-                    new 
IndexDataflowHelperFactory(storaegComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
-            LSMPrimaryUpsertOperatorDescriptor op;
-            ITypeTraits[] outputTypeTraits =
-                    new ITypeTraits[recordDesc.getFieldCount() + 
(dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-            ISerializerDeserializer<?>[] outputSerDes = new 
ISerializerDeserializer[recordDesc.getFieldCount()
-                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-
-            // add the previous record first
-            int f = 0;
-            outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
-            f++;
-            // add the previous meta second
-            if (dataset.hasMetaPart()) {
-                outputSerDes[f] =
-                        
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
-                outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
-                f++;
-            }
-            // add the previous filter third
-            int fieldIdx = -1;
-            if (numFilterFields > 0) {
-                String filterField = 
DatasetUtil.getFilterField(dataset).get(0);
-                for (i = 0; i < itemType.getFieldNames().length; i++) {
-                    if (itemType.getFieldNames()[i].equals(filterField)) {
-                        break;
-                    }
-                }
-                fieldIdx = i;
-                outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider()
-                        .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
-                outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider()
-                        
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
-                f++;
-            }
-            for (int j = 0; j < recordDesc.getFieldCount(); j++) {
-                outputTypeTraits[j + f] = recordDesc.getTypeTraits()[j];
-                outputSerDes[j + f] = recordDesc.getFields()[j];
-            }
-            RecordDescriptor outputRecordDesc = new 
RecordDescriptor(outputSerDes, outputTypeTraits);
-            op = new LSMPrimaryUpsertOperatorDescriptor(spec, 
outputRecordDesc, fieldPermutation, idfh,
-                    context.getMissingWriterFactory(), 
modificationCallbackFactory, searchCallbackFactory,
-                    dataset.getFrameOpCallbackFactory(), numKeys, itemType, 
fieldIdx, hasSecondaries);
-            return new Pair<>(op, splitsAndConstraint.second);
-
-        } catch (MetadataException me) {
-            throw new AlgebricksException(me);
-        }
+        return DatasetUtil.createPrimaryIndexUpsertOp(spec, this, dataset, 
recordDesc, fieldPermutation,
+                context.getMissingWriterFactory());
     }
+
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildExternalDatasetDataScannerRuntime(
             JobSpecification jobSpec, IAType itemType, IAdapterFactory 
adapterFactory, IDataFormat format)
@@ -1635,15 +1561,12 @@
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
getSplitProviderAndConstraints(Dataset ds)
             throws AlgebricksException {
-        FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(), 
ds.getDatasetName(), ds.getDatasetName(),
-                ds.getDatasetDetails().isTemp());
-        return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
+        return getSplitProviderAndConstraints(ds, ds.getDatasetName());
     }
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
getSplitProviderAndConstraints(Dataset ds,
             String indexName) throws AlgebricksException {
-        FileSplit[] splits = splitsForDataset(mdTxnCtx, ds.getDataverseName(), 
ds.getDatasetName(), indexName,
-                ds.getDatasetDetails().isTemp());
+        FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName);
         return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
     }
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 34fa7bb..2dffc02 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -44,6 +44,9 @@
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
 import org.apache.asterix.external.indexing.IndexingConstants;
+import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.formats.nontagged.TypeTraitProvider;
 import org.apache.asterix.metadata.IDatasetDetails;
 import org.apache.asterix.metadata.MetadataCache;
 import org.apache.asterix.metadata.MetadataManager;
@@ -75,17 +78,23 @@
 import 
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
 import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
+import 
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
 import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
 import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
@@ -133,7 +142,9 @@
     private final IDatasetDetails datasetDetails;
     private final String metaTypeDataverseName;
     private final String metaTypeName;
+    private final long rebalanceId;
     private int pendingOp;
+
     /*
      * Transient (For caching)
      */
@@ -151,6 +162,30 @@
             String metaItemTypeDataverseName, String metaItemTypeName, String 
nodeGroupName, String compactionPolicy,
             Map<String, String> compactionPolicyProperties, IDatasetDetails 
datasetDetails, Map<String, String> hints,
             DatasetType datasetType, int datasetId, int pendingOp) {
+        this(dataverseName, datasetName, itemTypeDataverseName, itemTypeName, 
metaItemTypeDataverseName,
+                metaItemTypeName, nodeGroupName, compactionPolicy, 
compactionPolicyProperties, datasetDetails, hints,
+                datasetType, datasetId, pendingOp, 0L);
+    }
+
+    public Dataset(Dataset dataset) {
+        this(dataset.dataverseName, dataset.datasetName, 
dataset.recordTypeDataverseName, dataset.recordTypeName,
+                dataset.metaTypeDataverseName, dataset.metaTypeName, 
dataset.nodeGroupName,
+                dataset.compactionPolicyFactory, 
dataset.compactionPolicyProperties, dataset.datasetDetails,
+                dataset.hints, dataset.datasetType, dataset.datasetId, 
dataset.pendingOp, dataset.rebalanceId);
+    }
+
+    public Dataset(Dataset dataset, boolean forRebalance, String 
nodeGroupName) {
+        this(dataset.dataverseName, dataset.datasetName, 
dataset.recordTypeDataverseName, dataset.recordTypeName,
+                dataset.metaTypeDataverseName, dataset.metaTypeName, 
nodeGroupName, dataset.compactionPolicyFactory,
+                dataset.compactionPolicyProperties, dataset.datasetDetails, 
dataset.hints, dataset.datasetType,
+                forRebalance ? 
DatasetIdFactory.generateAlternatingDatasetId(dataset.datasetId) : 
dataset.datasetId,
+                dataset.pendingOp, forRebalance ? dataset.rebalanceId + 1 : 
dataset.rebalanceId);
+    }
+
+    public Dataset(String dataverseName, String datasetName, String 
itemTypeDataverseName, String itemTypeName,
+            String metaItemTypeDataverseName, String metaItemTypeName, String 
nodeGroupName, String compactionPolicy,
+            Map<String, String> compactionPolicyProperties, IDatasetDetails 
datasetDetails, Map<String, String> hints,
+            DatasetType datasetType, int datasetId, int pendingOp, long 
rebalanceId) {
         this.dataverseName = dataverseName;
         this.datasetName = datasetName;
         this.recordTypeName = itemTypeName;
@@ -165,13 +200,7 @@
         this.datasetId = datasetId;
         this.pendingOp = pendingOp;
         this.hints = hints;
-    }
-
-    public Dataset(Dataset dataset) {
-        this(dataset.dataverseName, dataset.datasetName, 
dataset.recordTypeDataverseName, dataset.recordTypeName,
-                dataset.metaTypeDataverseName, dataset.metaTypeName, 
dataset.nodeGroupName,
-                dataset.compactionPolicyFactory, 
dataset.compactionPolicyProperties, dataset.datasetDetails,
-                dataset.hints, dataset.datasetType, dataset.datasetId, 
dataset.pendingOp);
+        this.rebalanceId = rebalanceId;
     }
 
     public String getDataverseName() {
@@ -228,6 +257,10 @@
 
     public String getMetaItemTypeName() {
         return metaTypeName;
+    }
+
+    public long getRebalanceId() {
+        return rebalanceId;
     }
 
     public boolean hasMetaPart() {
@@ -591,31 +624,12 @@
         return Objects.hash(dataverseName, datasetName);
     }
 
-    public IPushRuntimeFactory getCommitRuntimeFactory(JobId jobId, int[] 
primaryKeyFields,
-            MetadataProvider metadataProvider, int[] datasetPartitions, 
boolean isSink) {
+    public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider 
metadataProvider, JobId jobId,
+            int[] primaryKeyFields, boolean isSink) throws AlgebricksException 
{
+        int[] datasetPartitions = getDatasetPartitions(metadataProvider);
         return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
                 metadataProvider.isTemporaryDatasetWriteJob(), 
metadataProvider.isWriteTransaction(), datasetPartitions,
                 isSink);
-    }
-
-    /**
-     * Get the index dataflow helper factory for the dataset's primary index
-     *
-     * @param mdProvider
-     *            an instance of metadata provider that is used to fetch 
metadata information
-     * @throws AlgebricksException
-     */
-    public IResourceFactory getResourceFactory(MetadataProvider mdProvider) 
throws AlgebricksException {
-        if (getDatasetType() != DatasetType.INTERNAL) {
-            throw new AlgebricksException(ErrorCode.ASTERIX,
-                    
ErrorCode.COMPILATION_DATASET_TYPE_DOES_NOT_HAVE_PRIMARY_INDEX, 
getDatasetType());
-        }
-        Index index = mdProvider.getIndex(getDataverseName(), 
getDatasetName(), getDatasetName());
-        ARecordType recordType = (ARecordType) 
mdProvider.findType(getItemTypeDataverseName(), getItemTypeName());
-        ARecordType metaType = (ARecordType) 
mdProvider.findType(getMetaItemTypeDataverseName(), getMetaItemTypeName());
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                DatasetUtil.getMergePolicyFactory(this, 
mdProvider.getMetadataTxnContext());
-        return getResourceFactory(mdProvider, index, recordType, metaType, 
compactionInfo.first, compactionInfo.second);
     }
 
     public IFrameOperationCallbackFactory getFrameOpCallbackFactory() {
@@ -659,6 +673,41 @@
         return typeTraits;
     }
 
+    public RecordDescriptor getPrimaryRecordDescriptor(MetadataProvider 
metadataProvider) throws AlgebricksException {
+        List<List<String>> partitioningKeys = getPrimaryKeys();
+        int numPrimaryKeys = partitioningKeys.size();
+        ISerializerDeserializer[] primaryRecFields = new 
ISerializerDeserializer[numPrimaryKeys + 1
+                + (hasMetaPart() ? 1 : 0)];
+        ITypeTraits[] primaryTypeTraits = new ITypeTraits[numPrimaryKeys + 1 + 
(hasMetaPart() ? 1 : 0)];
+        ISerializerDeserializerProvider serdeProvider = 
metadataProvider.getFormat().getSerdeProvider();
+        List<Integer> indicators = null;
+        if (hasMetaPart()) {
+            indicators = ((InternalDatasetDetails) 
getDatasetDetails()).getKeySourceIndicator();
+        }
+        ARecordType itemType = (ARecordType) metadataProvider.findType(this);
+        ARecordType metaType = (ARecordType) 
metadataProvider.findMetaType(this);
+
+        // Set the serde/traits for primary keys
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            IAType keyType = (indicators == null || indicators.get(i) == 0)
+                    ? itemType.getSubFieldType(partitioningKeys.get(i))
+                    : metaType.getSubFieldType(partitioningKeys.get(i));
+            primaryRecFields[i] = 
serdeProvider.getSerializerDeserializer(keyType);
+            primaryTypeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+        }
+
+        // Set the serde for the record field
+        primaryRecFields[numPrimaryKeys] = 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(itemType);
+        primaryTypeTraits[numPrimaryKeys] = 
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        if (hasMetaPart()) {
+            // Set the serde and traits for the meta record field
+            primaryRecFields[numPrimaryKeys + 1] = 
SerializerDeserializerProvider.INSTANCE
+                    .getSerializerDeserializer(metaType);
+            primaryTypeTraits[numPrimaryKeys + 1] = 
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+        }
+        return new RecordDescriptor(primaryRecFields, primaryTypeTraits);
+    }
+
     public IBinaryComparatorFactory[] 
getPrimaryComparatorFactories(MetadataProvider metadataProvider,
             ARecordType recordType, ARecordType metaType) throws 
AlgebricksException {
         IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
@@ -671,12 +720,32 @@
             indicators = ((InternalDatasetDetails) 
getDatasetDetails()).getKeySourceIndicator();
         }
         for (int i = 0; i < numPrimaryKeys; i++) {
-            IAType keyType =
-                    (indicators == null || indicators.get(i) == 0) ? 
recordType.getSubFieldType(partitioningKeys.get(i))
-                            : 
metaType.getSubFieldType(partitioningKeys.get(i));
+            IAType keyType = (indicators == null || indicators.get(i) == 0)
+                    ? recordType.getSubFieldType(partitioningKeys.get(i))
+                    : metaType.getSubFieldType(partitioningKeys.get(i));
             cmpFactories[i] = 
cmpFactoryProvider.getBinaryComparatorFactory(keyType, true);
         }
         return cmpFactories;
+    }
+
+    public IBinaryHashFunctionFactory[] 
getPrimaryHashFunctionFactories(MetadataProvider metadataProvider)
+            throws AlgebricksException {
+        ARecordType recordType = (ARecordType) metadataProvider.findType(this);
+        ARecordType metaType = (ARecordType) 
metadataProvider.findMetaType(this);
+        List<List<String>> partitioningKeys = getPrimaryKeys();
+        int numPrimaryKeys = partitioningKeys.size();
+        IBinaryHashFunctionFactory[] hashFuncFactories = new 
IBinaryHashFunctionFactory[numPrimaryKeys];
+        List<Integer> indicators = null;
+        if (hasMetaPart()) {
+            indicators = ((InternalDatasetDetails) 
getDatasetDetails()).getKeySourceIndicator();
+        }
+        for (int i = 0; i < numPrimaryKeys; i++) {
+            IAType keyType =
+                    (indicators == null || indicators.get(i) == 0) ? 
recordType.getSubFieldType(partitioningKeys.get(i))
+                            : 
metaType.getSubFieldType(partitioningKeys.get(i));
+            hashFuncFactories[i] = 
BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(keyType);
+        }
+        return hashFuncFactories;
     }
 
     @Override
@@ -689,4 +758,14 @@
         }
         return bloomFilterKeyFields;
     }
+
+    protected int[] getDatasetPartitions(MetadataProvider metadataProvider) 
throws AlgebricksException {
+        FileSplit[] splitsForDataset = 
metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this,
+                getDatasetName());
+        int[] datasetPartitions = new int[splitsForDataset.length];
+        for (int i = 0; i < splitsForDataset.length; i++) {
+            datasetPartitions[i] = i;
+        }
+        return datasetPartitions;
+    }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
index df47c70..5243855 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Index.java
@@ -41,7 +41,6 @@
 
     private static final long serialVersionUID = 1L;
     public static final int RECORD_INDICATOR = 0;
-    public static final int META_INDICATOR = 1;
 
     private final String dataverseName;
     // Enforced to be unique within a dataverse.
@@ -59,6 +58,13 @@
     // Type of pending operations with respect to atomic DDL operation
     private int pendingOp;
 
+
+//    public Index(Dataset dataset){
+//        // Constructs the primary indexes from dataset.
+//        this(dataset.getDataverseName(), dataset.getDatasetName(), 
dataset.getDatasetName(),
+//                IndexType.BTREE, dataset.getPrimaryKeys(), null, 
dataset.getpri);
+//    }
+
     public Index(String dataverseName, String datasetName, String indexName, 
IndexType indexType,
             List<List<String>> keyFieldNames, List<Integer> 
keyFieldSourceIndicators, List<IAType> keyFieldTypes,
             int gramLength, boolean enforceKeyFields, boolean isPrimaryIndex, 
int pendingOp) {
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index c3c5023..3273a68 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -51,8 +51,10 @@
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.base.ADateTime;
 import org.apache.asterix.om.base.AInt32;
+import org.apache.asterix.om.base.AInt64;
 import org.apache.asterix.om.base.AInt8;
 import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.base.AMutableInt64;
 import org.apache.asterix.om.base.AMutableString;
 import org.apache.asterix.om.base.AOrderedList;
 import org.apache.asterix.om.base.ARecord;
@@ -73,26 +75,26 @@
  */
 public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
     private static final long serialVersionUID = 1L;
-    // Field indexes of serialized Dataset in a tuple.
-    // First key field.
-    public static final int DATASET_DATAVERSENAME_TUPLE_FIELD_INDEX = 0;
-    // Second key field.
-    public static final int DATASET_DATASETNAME_TUPLE_FIELD_INDEX = 1;
     // Payload field containing serialized Dataset.
     public static final int DATASET_PAYLOAD_TUPLE_FIELD_INDEX = 2;
+    private static final String REBALANCE_ID_FIELD_NAME = "rebalanceId";
 
     @SuppressWarnings("unchecked")
     protected final ISerializerDeserializer<ARecord> recordSerDes =
             
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(MetadataRecordTypes.DATASET_RECORDTYPE);
     protected final transient AMutableInt32 aInt32;
     protected final transient ISerializerDeserializer<AInt32> aInt32Serde;
+    protected final transient AMutableInt64 aBigInt;
+    protected final transient ISerializerDeserializer<AInt64> aBigIntSerde;
     protected final transient ArrayBackedValueStorage fieldName = new 
ArrayBackedValueStorage();
 
     @SuppressWarnings("unchecked")
     protected DatasetTupleTranslator(boolean getTuple) {
         super(getTuple, 
MetadataPrimaryIndexes.DATASET_DATASET.getFieldCount());
         aInt32 = new AMutableInt32(-1);
+        aBigInt = new AMutableInt64(-1);
         aInt32Serde = 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
+        aBigIntSerde = 
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
     }
 
     @Override
@@ -107,7 +109,6 @@
     }
 
     protected Dataset createDatasetFromARecord(ARecord datasetRecord) throws 
HyracksDataException {
-
         String dataverseName =
                 ((AString) 
datasetRecord.getValueByPos(MetadataRecordTypes.DATASET_ARECORD_DATAVERSENAME_FIELD_INDEX))
                         .getStringValue();
@@ -258,9 +259,14 @@
             metaTypeName = ((AString) 
datasetRecord.getValueByPos(metaTypeNameIndex)).getStringValue();
         }
 
+        // Read the rebalance id if there is one.
+        int rebalanceIdIndex = 
datasetRecord.getType().getFieldIndex(REBALANCE_ID_FIELD_NAME);
+        long rebalanceId = rebalanceIdIndex >= 0
+                ? ((AInt64) 
datasetRecord.getValueByPos(rebalanceIdIndex)).getLongValue() : 0;
+
         return new Dataset(dataverseName, datasetName, typeDataverseName, 
typeName, metaTypeDataverseName, metaTypeName,
                 nodeGroupName, compactionPolicy, compactionPolicyProperties, 
datasetDetails, hints, datasetType,
-                datasetId, pendingOp);
+                datasetId, pendingOp, rebalanceId);
     }
 
     @Override
@@ -409,6 +415,16 @@
             stringSerde.serialize(aString, fieldValue.getDataOutput());
             recordBuilder.addField(fieldName, fieldValue);
         }
+        if (dataset.getRebalanceId() > 0) {
+            // Adds the field rebalanceId.
+            fieldName.reset();
+            aString.setValue("rebalanceId");
+            stringSerde.serialize(aString, fieldName.getDataOutput());
+            fieldValue.reset();
+            aBigInt.setValue(dataset.getRebalanceId());
+            aBigIntSerde.serialize(aBigInt, fieldValue.getDataOutput());
+            recordBuilder.addField(fieldName, fieldValue);
+        }
     }
 
     protected void writeDatasetDetailsRecordType(IARecordBuilder 
recordBuilder, Dataset dataset, DataOutput dataOutput)
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java
index 6e6f086..418d48f 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/LockList.java
@@ -21,13 +21,16 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.asterix.metadata.lock.IMetadataLock.Mode;
 import org.apache.commons.lang3.tuple.Pair;
 
 public class LockList {
-    List<Pair<IMetadataLock.Mode, IMetadataLock>> locks = new ArrayList<>();
+    private List<Pair<IMetadataLock.Mode, IMetadataLock>> locks = new 
ArrayList<>();
+    private boolean unlocked = false;
 
     public void add(IMetadataLock.Mode mode, IMetadataLock lock) {
+        if (unlocked) {
+            throw new IllegalStateException();
+        }
         lock.acquire(mode);
         locks.add(Pair.of(mode, lock));
     }
@@ -37,9 +40,12 @@
             Pair<IMetadataLock.Mode, IMetadataLock> pair = locks.get(i);
             pair.getRight().release(pair.getLeft());
         }
+        locks.clear();
+        unlocked = true;
     }
 
     public void reset() {
-        locks.clear();
+        unlock();
+        unlocked = false;
     }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 098645e..6223bba 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.metadata.utils;
 
 import java.io.DataOutput;
-import java.io.File;
 import java.rmi.RemoteException;
 import java.util.List;
 import java.util.Map;
@@ -29,11 +28,18 @@
 import org.apache.asterix.builders.RecordBuilder;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
+import org.apache.asterix.common.context.IStorageComponentProvider;
+import org.apache.asterix.common.context.ITransactionSubsystemProvider;
+import org.apache.asterix.common.context.TransactionSubsystemProvider;
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.MetadataException;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -47,25 +53,43 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.formats.FormatUtils;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
+import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
+import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
+import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import 
org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
+import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
 import org.apache.hyracks.storage.common.IResourceFactory;
@@ -217,8 +241,7 @@
 
     public static JobSpecification dropDatasetJobSpec(Dataset dataset, 
MetadataProvider metadataProvider)
             throws AlgebricksException, HyracksDataException, RemoteException, 
ACIDException {
-        String datasetPath = dataset.getDataverseName() + File.separator + 
dataset.getDatasetName();
-        LOGGER.info("DROP DATASETPATH: " + datasetPath);
+        LOGGER.info("DROP DATASET: " + dataset);
         if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
             return 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
         }
@@ -249,41 +272,32 @@
         return spec;
     }
 
-    public static JobSpecification createDatasetJobSpec(Dataverse dataverse, 
String datasetName,
-            MetadataProvider metadataProvider) throws AlgebricksException {
-        String dataverseName = dataverse.getDataverseName();
-        Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);
-        if (dataset == null) {
-            throw new AsterixException("Could not find dataset " + datasetName 
+ " in dataverse " + dataverseName);
-        }
-        Index index = 
MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(), 
dataverseName,
-                datasetName, datasetName);
-        ARecordType itemType =
-                (ARecordType) 
metadataProvider.findType(dataset.getItemTypeDataverseName(), 
dataset.getItemTypeName());
+    public static JobSpecification createDatasetJobSpec(Dataset dataset, 
MetadataProvider metadataProvider)
+            throws AlgebricksException {
+        Index index = IndexUtil.getPrimaryIndex(dataset);
+        ARecordType itemType = (ARecordType) 
metadataProvider.findType(dataset);
         // get meta item type
         ARecordType metaItemType = null;
         if (dataset.hasMetaPart()) {
-            metaItemType = (ARecordType) 
metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
-                    dataset.getMetaItemTypeName());
+            metaItemType = (ARecordType) metadataProvider.findType(dataset);
         }
         JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                metadataProvider.getSplitProviderAndConstraints(dataset);
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint = metadataProvider
+                .getSplitProviderAndConstraints(dataset);
         FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
         StringBuilder sb = new StringBuilder();
         for (int i = 0; i < fs.length; i++) {
             sb.append(fs[i] + " ");
         }
         LOGGER.info("CREATING File Splits: " + sb.toString());
-
-        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                DatasetUtil.getMergePolicyFactory(dataset, 
metadataProvider.getMetadataTxnContext());
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = 
DatasetUtil.getMergePolicyFactory(dataset,
+                metadataProvider.getMetadataTxnContext());
         //prepare a LocalResourceMetadata which will be stored in NC's local 
resource repository
         IResourceFactory resourceFactory = 
dataset.getResourceFactory(metadataProvider, index, itemType, metaItemType,
                 compactionInfo.first, compactionInfo.second);
-        IndexBuilderFactory indexBuilderFactory =
-                new 
IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
-                        splitsAndConstraint.first, resourceFactory, 
!dataset.isTemp());
+        IndexBuilderFactory indexBuilderFactory = new IndexBuilderFactory(
+                
metadataProvider.getStorageComponentProvider().getStorageManager(), 
splitsAndConstraint.first,
+                resourceFactory, !dataset.isTemp());
         IndexCreateOperatorDescriptor indexCreateOp = new 
IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
indexCreateOp,
                 splitsAndConstraint.second);
@@ -292,7 +306,7 @@
     }
 
     public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, 
String datasetName,
-            MetadataProvider metadataProvider) throws AsterixException, 
AlgebricksException {
+            MetadataProvider metadataProvider) throws AlgebricksException {
         String dataverseName = dataverse.getDataverseName();
         Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);
         if (dataset == null) {
@@ -313,6 +327,167 @@
         return spec;
     }
 
+    public static IOperatorDescriptor 
createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider 
metadataProvider,
+            Dataset dataset, JobId jobId) throws AlgebricksException {
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint = metadataProvider
+                .getSplitProviderAndConstraints(dataset);
+        IFileSplitProvider primaryFileSplitProvider = 
primarySplitsAndConstraint.first;
+        AlgebricksPartitionConstraint primaryPartitionConstraint = 
primarySplitsAndConstraint.second;
+        // -Infinity
+        int[] lowKeyFields = null;
+        // +Infinity
+        int[] highKeyFields = null;
+        ITransactionSubsystemProvider txnSubsystemProvider = 
TransactionSubsystemProvider.INSTANCE;
+        boolean temp = dataset.getDatasetDetails().isTemp();
+        ISearchOperationCallbackFactory searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
+                : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, 
dataset.getDatasetId(),
+                        dataset.getPrimaryBloomFilterFields(), 
txnSubsystemProvider,
+                        IRecoveryManager.ResourceType.LSM_BTREE);
+        IndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
+                
metadataProvider.getStorageComponentProvider().getStorageManager(), 
primaryFileSplitProvider);
+        BTreeSearchOperatorDescriptor primarySearchOp = new 
BTreeSearchOperatorDescriptor(spec,
+                dataset.getPrimaryRecordDescriptor(metadataProvider), 
lowKeyFields, highKeyFields, true, true,
+                indexHelperFactory, false, false, null, searchCallbackFactory, 
null, null, false);
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
primarySearchOp,
+                primaryPartitionConstraint);
+        return primarySearchOp;
+    }
+
+    public static IOperatorDescriptor 
createPrimaryIndexUpsertOp(JobSpecification spec,
+            MetadataProvider metadataProvider, Dataset source, Dataset target) 
throws AlgebricksException {
+        int numKeys = source.getPrimaryKeys().size();
+        int numValues = source.hasMetaPart() ? 2 : 1;
+        int[] fieldPermutation = new int[numKeys + numValues];
+        for (int i = 0; i < fieldPermutation.length; ++i) {
+            fieldPermutation[i] = i;
+        }
+        Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
upsertOpAndConstraints = createPrimaryIndexUpsertOp(
+                spec, metadataProvider, target, 
source.getPrimaryRecordDescriptor(metadataProvider), fieldPermutation,
+                MissingWriterFactory.INSTANCE);
+        IOperatorDescriptor upsertOp = upsertOpAndConstraints.first;
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
upsertOp,
+                upsertOpAndConstraints.second);
+        return upsertOp;
+    }
+
+    public static IOperatorDescriptor createUpsertCommitOp(JobSpecification 
spec, MetadataProvider metadataProvider,
+            JobId jobId, Dataset target) throws AlgebricksException {
+        int numKeys = target.getPrimaryKeys().size();
+        int[] primaryKeyFields = new int[numKeys];
+        for (int i = 0; i < numKeys; i++) {
+            primaryKeyFields[i] = i;
+        }
+        AlgebricksMetaOperatorDescriptor metaOp = new 
AlgebricksMetaOperatorDescriptor(spec, 1, 0,
+                new IPushRuntimeFactory[] {
+                        target.getCommitRuntimeFactory(metadataProvider, 
jobId, primaryKeyFields, true) },
+                new RecordDescriptor[] { 
target.getPrimaryRecordDescriptor(metadataProvider) });
+        return metaOp;
+    }
+
+    public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
createPrimaryIndexUpsertOp(
+            JobSpecification spec, MetadataProvider metadataProvider, Dataset 
dataset, RecordDescriptor inputRecordDesc,
+            int[] fieldPermutation, IMissingWriterFactory 
missingWriterFactory) throws AlgebricksException {
+        int numKeys = dataset.getPrimaryKeys().size();
+        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
+        ARecordType itemType = (ARecordType) 
metadataProvider.findType(dataset);
+        ARecordType metaItemType = (ARecordType) 
metadataProvider.findMetaType(dataset);
+        try {
+            Index primaryIndex = 
metadataProvider.getIndex(dataset.getDataverseName(), dataset.getDatasetName(),
+                    dataset.getDatasetName());
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint = metadataProvider
+                    .getSplitProviderAndConstraints(dataset);
+
+            // prepare callback
+            JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
+            int[] primaryKeyFields = new int[numKeys];
+            for (int i = 0; i < numKeys; i++) {
+                primaryKeyFields[i] = i;
+            }
+            boolean hasSecondaries = metadataProvider
+                    .getDatasetIndexes(dataset.getDataverseName(), 
dataset.getDatasetName()).size() > 1;
+            IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
+            IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
+                    storageComponentProvider, primaryIndex, jobId, 
IndexOperation.UPSERT, primaryKeyFields);
+            ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
+                    storageComponentProvider, primaryIndex, jobId, 
IndexOperation.UPSERT, primaryKeyFields);
+            IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
+                    storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
+            LSMPrimaryUpsertOperatorDescriptor op;
+            ITypeTraits[] outputTypeTraits = new 
ITypeTraits[inputRecordDesc.getFieldCount()
+                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+            ISerializerDeserializer<?>[] outputSerDes = new 
ISerializerDeserializer[inputRecordDesc.getFieldCount()
+                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+
+            // add the previous record first
+            int f = 0;
+            outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+            f++;
+            // add the previous meta second
+            if (dataset.hasMetaPart()) {
+                outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider()
+                        .getSerializerDeserializer(metaItemType);
+                outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+                f++;
+            }
+            // add the previous filter third
+            int fieldIdx = -1;
+            if (numFilterFields > 0) {
+                String filterField = 
DatasetUtil.getFilterField(dataset).get(0);
+                String[] fieldNames = itemType.getFieldNames();
+                int i = 0;
+                for (; i < fieldNames.length; i++) {
+                    if (fieldNames[i].equals(filterField)) {
+                        break;
+                    }
+                }
+                fieldIdx = i;
+                outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider()
+                        .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+                outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider()
+                        
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+                f++;
+            }
+            for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
+                outputTypeTraits[j + f] = inputRecordDesc.getTypeTraits()[j];
+                outputSerDes[j + f] = inputRecordDesc.getFields()[j];
+            }
+            RecordDescriptor outputRecordDesc = new 
RecordDescriptor(outputSerDes, outputTypeTraits);
+            op = new LSMPrimaryUpsertOperatorDescriptor(spec, 
outputRecordDesc, fieldPermutation, idfh,
+                    missingWriterFactory, modificationCallbackFactory, 
searchCallbackFactory,
+                    dataset.getFrameOpCallbackFactory(), numKeys, itemType, 
fieldIdx, hasSecondaries);
+            return new Pair<>(op, splitsAndConstraint.second);
+        } catch (MetadataException me) {
+            throw new AlgebricksException(me);
+        }
+    }
+
+    public static AbstractOperatorDescriptor 
createDummyKeyProviderOp(JobSpecification spec, Dataset dataset,
+            MetadataProvider metadataProvider) throws AlgebricksException {
+        Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint = metadataProvider
+                .getSplitProviderAndConstraints(dataset);
+        AlgebricksPartitionConstraint primaryPartitionConstraint = 
primarySplitsAndConstraint.second;
+
+        // Build dummy tuple containing one field with a dummy value inside.
+        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
+        DataOutput dos = tb.getDataOutput();
+        tb.reset();
+        try {
+            // Serialize dummy value into a field.
+            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
+        } catch (HyracksDataException e) {
+            throw new AsterixException(e);
+        }
+        // Add dummy field.
+        tb.addFieldEndOffset();
+        ISerializerDeserializer[] keyRecDescSers = { 
IntegerSerializerDeserializer.INSTANCE };
+        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
+        ConstantTupleSourceOperatorDescriptor keyProviderOp = new 
ConstantTupleSourceOperatorDescriptor(spec,
+                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), 
tb.getSize());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
keyProviderOp,
+                primaryPartitionConstraint);
+        return keyProviderOp;
+    }
+
     public static boolean isFullyQualifiedName(String datasetName) {
         return datasetName.indexOf('.') > 0; //NOSONAR a fully qualified name 
can't start with a .
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 6d07fc7..4baeb55 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -21,6 +21,7 @@
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.config.OptimizationConfUtil;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -28,6 +29,7 @@
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -53,6 +55,13 @@
         return secondaryFilterFields(dataset, index, filterTypeTraits);
     }
 
+    public static Index getPrimaryIndex(Dataset dataset) {
+        InternalDatasetDetails id = (InternalDatasetDetails) 
dataset.getDatasetDetails();
+        return new Index(dataset.getDataverseName(), dataset.getDatasetName(), 
dataset.getDatasetName(),
+                DatasetConfig.IndexType.BTREE, id.getPartitioningKey(), 
id.getKeySourceIndicator(),
+                id.getPrimaryKeyType(), false, true, dataset.getPendingOp());
+    }
+
     public static int[] getBtreeFieldsIfFiltered(Dataset dataset, Index index) 
throws AlgebricksException {
         if (index.isPrimaryIndex()) {
             return DatasetUtil.createBTreeFieldsWhenThereisAFilter(dataset);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index 1f7914d..4ce4708 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -20,8 +20,9 @@
 
 import java.util.List;
 
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -29,7 +30,9 @@
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
+import 
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import 
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
@@ -46,12 +49,12 @@
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
@@ -128,13 +131,21 @@
             return spec;
         } else {
             // Create dummy key provider for feeding the primary index scan.
-            AbstractOperatorDescriptor keyProviderOp = 
createDummyKeyProviderOp(spec);
+            AbstractOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset,
+                    metadataProvider);
+            JobId jobId = JobIdFactory.generateJobId();
+            metadataProvider.setJobId(jobId);
+            boolean isWriteTransaction = metadataProvider.isWriteTransaction();
+            IJobletEventListenerFactory jobEventListenerFactory = new 
JobEventListenerFactory(jobId,
+                    isWriteTransaction);
+            spec.setJobletEventListenerFactory(jobEventListenerFactory);
 
             // Create primary index scan op.
-            BTreeSearchOperatorDescriptor primaryScanOp = 
createPrimaryIndexScanOp(spec);
+            IOperatorDescriptor primaryScanOp = 
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
+                    jobId);
 
             // Assign op.
-            AbstractOperatorDescriptor sourceOp = primaryScanOp;
+            IOperatorDescriptor sourceOp = primaryScanOp;
             if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
                 sourceOp = createCastOp(spec, dataset.getDatasetType());
                 spec.connect(new OneToOneConnectorDescriptor(spec), 
primaryScanOp, 0, sourceOp, 0);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
index fa519e1..f0ce889 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryIndexOperationsHelper.java
@@ -19,19 +19,13 @@
 
 package org.apache.asterix.metadata.utils;
 
-import java.io.DataOutput;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
-import org.apache.asterix.common.context.ITransactionSubsystemProvider;
-import org.apache.asterix.common.context.TransactionSubsystemProvider;
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
-import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import 
org.apache.asterix.external.operators.ExternalIndexBulkLoadOperatorDescriptor;
@@ -52,9 +46,6 @@
 import org.apache.asterix.runtime.evaluators.functions.CastTypeDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.IsUnknownDescriptor;
 import org.apache.asterix.runtime.evaluators.functions.NotDescriptor;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -71,21 +62,11 @@
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import 
org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import 
org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 
 @SuppressWarnings("rawtypes")
@@ -262,59 +243,13 @@
         primaryTypeTraits[numPrimaryKeys] = 
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
         if (dataset.hasMetaPart()) {
             primaryRecFields[numPrimaryKeys + 1] = payloadSerde;
-            primaryTypeTraits[numPrimaryKeys + 1] = 
TypeTraitProvider.INSTANCE.getTypeTrait(itemType);
+            primaryTypeTraits[numPrimaryKeys + 1] = 
TypeTraitProvider.INSTANCE.getTypeTrait(metaType);
         }
         primaryRecDesc = new RecordDescriptor(primaryRecFields, 
primaryTypeTraits);
     }
 
     protected abstract void setSecondaryRecDescAndComparators() throws 
AlgebricksException;
 
-    protected AbstractOperatorDescriptor 
createDummyKeyProviderOp(JobSpecification spec) throws AlgebricksException {
-        // Build dummy tuple containing one field with a dummy value inside.
-        ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
-        DataOutput dos = tb.getDataOutput();
-        tb.reset();
-        try {
-            // Serialize dummy value into a field.
-            IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
-        } catch (HyracksDataException e) {
-            throw new AsterixException(e);
-        }
-        // Add dummy field.
-        tb.addFieldEndOffset();
-        ISerializerDeserializer[] keyRecDescSers = { 
IntegerSerializerDeserializer.INSTANCE };
-        RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
-        ConstantTupleSourceOperatorDescriptor keyProviderOp = new 
ConstantTupleSourceOperatorDescriptor(spec,
-                keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), 
tb.getSize());
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
keyProviderOp,
-                primaryPartitionConstraint);
-        return keyProviderOp;
-    }
-
-    protected BTreeSearchOperatorDescriptor 
createPrimaryIndexScanOp(JobSpecification spec) throws AlgebricksException {
-        // -Infinity
-        int[] lowKeyFields = null;
-        // +Infinity
-        int[] highKeyFields = null;
-        ITransactionSubsystemProvider txnSubsystemProvider = 
TransactionSubsystemProvider.INSTANCE;
-        JobId jobId = JobIdFactory.generateJobId();
-        metadataProvider.setJobId(jobId);
-        boolean isWriteTransaction = metadataProvider.isWriteTransaction();
-        IJobletEventListenerFactory jobEventListenerFactory = new 
JobEventListenerFactory(jobId, isWriteTransaction);
-        spec.setJobletEventListenerFactory(jobEventListenerFactory);
-        boolean temp = dataset.getDatasetDetails().isTemp();
-        ISearchOperationCallbackFactory searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
-                : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, 
dataset.getDatasetId(),
-                        primaryBloomFilterKeyFields, txnSubsystemProvider, 
ResourceType.LSM_BTREE);
-        IndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
-                
metadataProvider.getStorageComponentProvider().getStorageManager(), 
primaryFileSplitProvider);
-        BTreeSearchOperatorDescriptor primarySearchOp =
-                new BTreeSearchOperatorDescriptor(spec, primaryRecDesc, 
lowKeyFields, highKeyFields, true, true,
-                        indexHelperFactory, false, false, null, 
searchCallbackFactory, null, null, false);
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
primarySearchOp,
-                primaryPartitionConstraint);
-        return primarySearchOp;
-    }
 
     protected AlgebricksMetaOperatorDescriptor createAssignOp(JobSpecification 
spec, int numSecondaryKeyFields,
             RecordDescriptor secondaryRecDesc) throws AlgebricksException {
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
index 1bb1377..227435e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
@@ -18,10 +18,11 @@
  */
 package org.apache.asterix.metadata.utils;
 
-import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
@@ -30,7 +31,9 @@
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.asterix.om.utils.RecordUtil;
 import org.apache.asterix.runtime.formats.FormatUtils;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
+import 
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -42,10 +45,12 @@
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import org.apache.hyracks.data.std.primitive.ShortPointable;
@@ -54,7 +59,6 @@
 import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
@@ -207,14 +211,21 @@
     @Override
     public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
         JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
+        JobId jobId = JobIdFactory.generateJobId();
+        metadataProvider.setJobId(jobId);
+        boolean isWriteTransaction = metadataProvider.isWriteTransaction();
+        IJobletEventListenerFactory jobEventListenerFactory = new 
JobEventListenerFactory(jobId, isWriteTransaction);
+        spec.setJobletEventListenerFactory(jobEventListenerFactory);
 
         // Create dummy key provider for feeding the primary index scan.
-        AbstractOperatorDescriptor keyProviderOp = 
createDummyKeyProviderOp(spec);
+        AbstractOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset,
+                metadataProvider);
 
         // Create primary index scan op.
-        BTreeSearchOperatorDescriptor primaryScanOp = 
createPrimaryIndexScanOp(spec);
+        IOperatorDescriptor primaryScanOp = 
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
+                jobId);
 
-        AbstractOperatorDescriptor sourceOp = primaryScanOp;
+        IOperatorDescriptor sourceOp = primaryScanOp;
         boolean isEnforcingKeyTypes = index.isEnforcingKeyFileds();
         int numSecondaryKeys = index.getKeyFieldNames().size();
         if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
index 21fa754..ca47ccb 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
@@ -20,9 +20,10 @@
 
 import java.util.List;
 
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
@@ -35,7 +36,9 @@
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
+import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
+import 
org.apache.asterix.transaction.management.service.transaction.JobIdFactory;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import 
org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
@@ -48,12 +51,12 @@
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
-import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
@@ -202,13 +205,21 @@
                 
metadataProvider.getStorageComponentProvider().getStorageManager(), 
secondaryFileSplitProvider);
         if (dataset.getDatasetType() == DatasetType.INTERNAL) {
             // Create dummy key provider for feeding the primary index scan.
-            AbstractOperatorDescriptor keyProviderOp = 
createDummyKeyProviderOp(spec);
+            AbstractOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset,
+                    metadataProvider);
+            JobId jobId = JobIdFactory.generateJobId();
+            metadataProvider.setJobId(jobId);
+            boolean isWriteTransaction = metadataProvider.isWriteTransaction();
+            IJobletEventListenerFactory jobEventListenerFactory = new 
JobEventListenerFactory(jobId,
+                    isWriteTransaction);
+            spec.setJobletEventListenerFactory(jobEventListenerFactory);
 
             // Create primary index scan op.
-            BTreeSearchOperatorDescriptor primaryScanOp = 
createPrimaryIndexScanOp(spec);
+            IOperatorDescriptor primaryScanOp = 
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
+                    jobId);
 
             // Assign op.
-            AbstractOperatorDescriptor sourceOp = primaryScanOp;
+            IOperatorDescriptor sourceOp = primaryScanOp;
             if (isEnforcingKeyTypes && !enforcedItemType.equals(itemType)) {
                 sourceOp = createCastOp(spec, dataset.getDatasetType());
                 spec.connect(new OneToOneConnectorDescriptor(spec), 
primaryScanOp, 0, sourceOp, 0);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index db33dd5..36e9dc3 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -56,24 +56,23 @@
         return splits.toArray(new FileSplit[] {});
     }
 
-    public static FileSplit[] getDatasetSplits(Dataset dataset, 
MetadataTransactionContext mdTxnCtx,
-            String targetIdxName, boolean temp) throws AlgebricksException {
+    public static FileSplit[] getIndexSplits(Dataset dataset, String 
indexName, MetadataTransactionContext mdTxnCtx)
+            throws AlgebricksException {
         try {
             List<String> nodeGroup =
                     MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, 
dataset.getNodeGroupName()).getNodeNames();
             if (nodeGroup == null) {
                 throw new AlgebricksException("Couldn't find node group " + 
dataset.getNodeGroupName());
             }
-            return getDatasetSplits(dataset, nodeGroup, targetIdxName, temp);
+            return getIndexSplits(dataset, indexName, nodeGroup);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
         }
     }
 
-    public static FileSplit[] getDatasetSplits(Dataset dataset, List<String> 
nodes, String targetIdxName,
-            boolean temp) {
+    public static FileSplit[] getIndexSplits(Dataset dataset, String 
indexName, List<String> nodes) {
         File relPathFile = new 
File(StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
-                dataset.getDatasetName(), targetIdxName));
+                dataset.getDatasetName(), indexName, 
dataset.getRebalanceId()));
         String storageDirName = 
ClusterProperties.INSTANCE.getStorageDirectoryName();
         List<FileSplit> splits = new ArrayList<>();
         for (String nd : nodes) {
@@ -88,7 +87,8 @@
                 // format: 'storage dir 
name'/partition_#/dataverse/dataset_idx_index
                 File f = new File(
                         
StoragePathUtil.prepareStoragePartitionPath(storageDirName, 
nodePartitions[k].getPartitionId())
-                                + (temp ? (File.separator + 
StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER) : "")
+                                + (dataset.isTemp() ? (File.separator + 
StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER)
+                                        : "")
                                 + File.separator + relPathFile);
                 
splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[k], 
f.getPath()));
             }
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryHashFunctionFactoryProvider.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryHashFunctionFactoryProvider.java
index cea0369..6365860 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryHashFunctionFactoryProvider.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryHashFunctionFactoryProvider.java
@@ -25,28 +25,14 @@
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import 
org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
-import org.apache.hyracks.data.std.primitive.DoublePointable;
-import org.apache.hyracks.data.std.primitive.FloatPointable;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.data.std.primitive.UTF8StringLowercasePointable;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 
 public class BinaryHashFunctionFactoryProvider implements 
IBinaryHashFunctionFactoryProvider, Serializable {
 
     private static final long serialVersionUID = 1L;
     public static final BinaryHashFunctionFactoryProvider INSTANCE = new 
BinaryHashFunctionFactoryProvider();
-    public static final PointableBinaryHashFunctionFactory 
INTEGER_POINTABLE_INSTANCE =
-            new PointableBinaryHashFunctionFactory(IntegerPointable.FACTORY);
-    public static final PointableBinaryHashFunctionFactory 
FLOAT_POINTABLE_INSTANCE =
-            new PointableBinaryHashFunctionFactory(FloatPointable.FACTORY);
-    public static final PointableBinaryHashFunctionFactory 
DOUBLE_POINTABLE_INSTANCE =
-            new PointableBinaryHashFunctionFactory(DoublePointable.FACTORY);
     public static final PointableBinaryHashFunctionFactory 
UTF8STRING_POINTABLE_INSTANCE =
             new 
PointableBinaryHashFunctionFactory(UTF8StringPointable.FACTORY);
-    // Equivalent to UTF8STRING_POINTABLE_INSTANCE but all characters are 
considered lower case to implement
-    // case-insensitive hashing.
-    public static final PointableBinaryHashFunctionFactory 
UTF8STRING_LOWERCASE_POINTABLE_INSTANCE =
-            new 
PointableBinaryHashFunctionFactory(UTF8StringLowercasePointable.FACTORY);
 
     private BinaryHashFunctionFactoryProvider() {
     }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 44aaef5..c73ff46 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -222,7 +222,7 @@
                 createReplicationJob(ReplicationOperation.DELETE, 
resourceFile);
             }
         } else {
-            throw new HyracksDataException("Resource doesn't exist");
+            throw new HyracksDataException("Resource doesn't exist " + 
relativePath);
         }
     }
 
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/DatasetIdFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/DatasetIdFactory.java
index 324b7fd..cf2a593 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/DatasetIdFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/DatasetIdFactory.java
@@ -23,6 +23,7 @@
 public class DatasetIdFactory {
     private static AtomicInteger id = new AtomicInteger();
     private static boolean isInitialized = false;
+    private static int startId = 0;
 
     public static boolean isInitialized() {
         return isInitialized;
@@ -30,13 +31,19 @@
 
     public static void initialize(int initialId) {
         id.set(initialId);
+        startId = initialId;
         isInitialized = true;
     }
 
     public static int generateDatasetId() {
+        id.compareAndSet(Integer.MAX_VALUE, startId);
         return id.incrementAndGet();
     }
 
+    public static int generateAlternatingDatasetId(int originalId) {
+        return originalId ^ 0x8000;
+    }
+
     public static int getMostRecentDatasetId() {
         return id.get();
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/constraints/PartitionConstraintHelper.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/constraints/PartitionConstraintHelper.java
index a057f40..95f5e1a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/constraints/PartitionConstraintHelper.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/constraints/PartitionConstraintHelper.java
@@ -30,15 +30,8 @@
                 count)));
     }
 
-    public static void addLocationChoiceConstraint(JobSpecification spec, 
IOperatorDescriptor op, String[][] choices) {
-        addPartitionCountConstraint(spec, op, choices.length);
-        for (int i = 0; i < choices.length; ++i) {
-            spec.addUserConstraint(new Constraint(new 
PartitionLocationExpression(op.getOperatorId(), i),
-                    new ConstantExpression(choices[i])));
-        }
-    }
-
-    public static void addAbsoluteLocationConstraint(JobSpecification spec, 
IOperatorDescriptor op, String... locations) {
+    public static void addAbsoluteLocationConstraint(JobSpecification spec, 
IOperatorDescriptor op,
+            String... locations) {
         addPartitionCountConstraint(spec, op, locations.length);
         for (int i = 0; i < locations.length; ++i) {
             spec.addUserConstraint(new Constraint(new 
PartitionLocationExpression(op.getOperatorId(), i),

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1768
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ibda35252031fc4940972f0f19bbf796cadfa53d6
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Yingyi Bu <[email protected]>

Reply via email to