abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1523

Change subject: Cleanup and bug fixes in Feeds pipeline
......................................................................

Cleanup and bug fixes in Feeds pipeline

Change-Id: Ie97b2133ebecb7380cf0ba336e60ed714d06f8ee
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
M 
hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
M 
hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
R 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/ArrayValueReference.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/test/java/org/apache/hyracks/storage/am/common/frames/LIFOMetadataFrameTest.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
38 files changed, 498 insertions(+), 495 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/23/1523/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
index d785cce..da93fb8 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/CCMessageBroker.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.messaging;
 
-import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -32,7 +31,7 @@
 
 public class CCMessageBroker implements ICCMessageBroker {
 
-    private final static Logger LOGGER = 
Logger.getLogger(CCMessageBroker.class.getName());
+    private static final Logger LOGGER = 
Logger.getLogger(CCMessageBroker.class.getName());
     private final ClusterControllerService ccs;
 
     public CCMessageBroker(ClusterControllerService ccs) {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index cbb4868..1a8ccae 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -39,6 +39,7 @@
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
 import org.apache.hyracks.storage.am.common.api.IIndex;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
 import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
@@ -72,7 +73,7 @@
     }
 
     @Override
-    public synchronized IIndex get(String resourcePath) throws 
HyracksDataException {
+    public synchronized ILSMIndex get(String resourcePath) throws 
HyracksDataException {
         validateDatasetLifecycleManagerState();
         int datasetID = getDIDfromResourcePath(resourcePath);
         long resourceID = getResourceIDfromResourcePath(resourcePath);
@@ -80,7 +81,7 @@
     }
 
     @Override
-    public synchronized IIndex getIndex(int datasetID, long resourceID) throws 
HyracksDataException {
+    public synchronized ILSMIndex getIndex(int datasetID, long resourceID) 
throws HyracksDataException {
         validateDatasetLifecycleManagerState();
         DatasetResource datasetResource = datasets.get(datasetID);
         if (datasetResource == null) {
@@ -556,7 +557,7 @@
                 while (used + additionalSize > capacity) {
                     if (!evictCandidateDataset()) {
                         throw new HyracksDataException("Cannot allocate 
dataset " + dsInfo.getDatasetID()
-                                + " memory since memory budget would be 
exceeded.");
+                        + " memory since memory budget would be exceeded.");
                     }
                 }
                 used += additionalSize;
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index 403d3cb..41e587d 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -81,7 +81,7 @@
         return datasetVirtualBufferCaches;
     }
 
-    public IIndex getIndex(long resourceID) {
+    public ILSMIndex getIndex(long resourceID) {
         IndexInfo iInfo = getIndexInfo(resourceID);
         return (iInfo == null) ? null : iInfo.getIndex();
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
index b9d187d..001240b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ExceptionUtils.java
@@ -37,7 +37,7 @@
 
     public static HyracksDataException 
suppressIntoHyracksDataException(HyracksDataException hde, Throwable th) {
         if (hde == null) {
-            return new HyracksDataException(th);
+            return (th instanceof HyracksDataException) ? 
(HyracksDataException) th : new HyracksDataException(th);
         } else {
             hde.addSuppressed(th);
             return hde;
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index f903b65..6fa45f0 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -25,7 +25,7 @@
 import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
-import 
org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.common.freepage.ArrayValueReference;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -33,7 +33,7 @@
 
 // A single LSMIOOperationCallback per LSM index used to perform actions 
around Flush and Merge operations
 public abstract class AbstractLSMIOOperationCallback implements 
ILSMIOOperationCallback {
-    public static final MutableArrayValueReference LSN_KEY = new 
MutableArrayValueReference("LSN".getBytes());
+    public static final ArrayValueReference LSN_KEY = new 
ArrayValueReference("LSN".getBytes());
     public static final long INVALID = -1L;
 
     // First LSN per mutable component
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
index b977c4d..bbe2c4f 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
@@ -119,4 +119,8 @@
         pointable.setLong(lsn);
         
index.getCurrentMemoryComponent().getMetadata().put(ComponentMetadataUtil.MARKER_LSN_KEY,
 pointable);
     }
+
+    public ILSMIndex getIndex() {
+        return index;
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index d03f9df..830a4ec 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -61,7 +61,7 @@
     private boolean isFeed;
     private FileSplit[] feedLogFileSplits;
     private ARecordType metaType;
-    private FeedLogManager feedLogManager = null;
+    private transient FeedLogManager feedLogManager;
 
     @Override
     public void setSnapshot(List<ExternalFile> files, boolean indexingOp) {
@@ -75,8 +75,7 @@
     }
 
     @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
-            throws HyracksDataException, AlgebricksException {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() 
throws AlgebricksException {
         return dataSourceFactory.getPartitionConstraint();
     }
 
@@ -86,8 +85,8 @@
     @Override
     public synchronized IDataSourceAdapter createAdapter(IHyracksTaskContext 
ctx, int partition)
             throws HyracksDataException {
-        IAppRuntimeContext runtimeCtx = (IAppRuntimeContext) 
ctx.getJobletContext()
-                .getApplicationContext().getApplicationObject();
+        IAppRuntimeContext runtimeCtx =
+                (IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject();
         try {
             restoreExternalObjects(runtimeCtx.getLibraryManager());
         } catch (Exception e) {
@@ -184,6 +183,7 @@
         this.metaType = metaType;
     }
 
+    @Override
     public IExternalDataSourceFactory getDataSourceFactory() {
         return dataSourceFactory;
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
index 3ea3bb1..5b8a101 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IAdapterFactory.java
@@ -40,7 +40,7 @@
      *
      * @return the display name
      */
-    public String getAlias();
+    String getAlias();
 
     /**
      * Gets a list of partition constraints. A partition constraint can be a
@@ -54,10 +54,8 @@
      * running on the node with the given IP address.
      *
      * @throws AlgebricksException
-     * @throws HyracksDataException
      */
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
-            throws HyracksDataException, AlgebricksException;
+    AlgebricksAbsolutePartitionConstraint getPartitionConstraint() throws 
AlgebricksException;
 
     /**
      * Creates an instance of IDatasourceAdapter.
@@ -67,7 +65,7 @@
      * @return An instance of IDatasourceAdapter.
      * @throws Exception
      */
-    public IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int 
partition) throws HyracksDataException;
+    IDataSourceAdapter createAdapter(IHyracksTaskContext ctx, int partition) 
throws HyracksDataException;
 
     /**
      * @param libraryManager
@@ -75,14 +73,16 @@
      * @throws AlgebricksException
      * @throws HyracksDataException
      */
-    public void configure(ILibraryManager libraryManager, Map<String, String> 
configuration)
+    void configure(ILibraryManager libraryManager, Map<String, String> 
configuration)
             throws HyracksDataException, AlgebricksException;
 
-    public void setOutputType(ARecordType outputType);
+    void setOutputType(ARecordType outputType);
 
-    public void setMetaType(ARecordType metaType);
+    void setMetaType(ARecordType metaType);
 
-    public ARecordType getOutputType();
+    ARecordType getOutputType();
 
-    public ARecordType getMetaType();
+    ARecordType getMetaType();
+
+    IExternalDataSourceFactory getDataSourceFactory();
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
index e2274b9..656f17b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IExternalDataSourceFactory.java
@@ -54,7 +54,7 @@
      * @throws AsterixException
      */
     public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
-            throws AlgebricksException, HyracksDataException;
+            throws AlgebricksException;
 
     /**
      * Configure the data parser factory. The passed map contains key value 
pairs from the
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index d85fe65..57e79c3 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.external.dataflow;
 
-import java.io.IOException;
-
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordWithPKDataParser;
@@ -34,9 +32,9 @@
 
     public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final 
FeedTupleForwarder tupleForwarder,
             final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithPKDataParser<T> dataParser, final 
IRecordReader<T> recordReader, boolean sendMarker)
-            throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, 
dataParser, recordReader, sendMarker);
+            final IRecordWithPKDataParser<T> dataParser, final 
IRecordReader<T> recordReader)
+                    throws HyracksDataException {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, 
dataParser, recordReader);
         this.dataParser = dataParser;
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index 4c88b0f..22fa8be 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -18,8 +18,6 @@
  */
 package org.apache.asterix.external.dataflow;
 
-import java.io.IOException;
-
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.api.IRecordWithMetadataParser;
@@ -32,14 +30,14 @@
 
     public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, 
final FeedTupleForwarder tupleForwarder,
             final FeedLogManager feedLogManager, final int numOfOutputFields,
-            final IRecordWithMetadataParser<T> dataParser, final 
IRecordReader<T> recordReader, boolean sendMarker)
-            throws HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, 
dataParser, recordReader, sendMarker);
+            final IRecordWithMetadataParser<T> dataParser, final 
IRecordReader<T> recordReader)
+                    throws HyracksDataException {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, 
dataParser, recordReader);
     }
 
     @Override
     protected void addPrimaryKeys(final ArrayTupleBuilder tb, final 
IRawRecord<? extends T> record)
             throws HyracksDataException {
-        ((IRecordWithMetadataParser<T>) 
dataParser).appendLastParsedPrimaryKeyToTuple(tb);
+        dataParser.appendLastParsedPrimaryKeyToTuple(tb);
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 2e687ba..73b9e52 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -19,71 +19,55 @@
 package org.apache.asterix.external.dataflow;
 
 import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.common.exceptions.ExceptionUtils;
-import org.apache.asterix.external.api.IFeedMarker;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import org.apache.asterix.external.api.IRecordReader;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
-import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.log4j.Logger;
 
 public class FeedRecordDataFlowController<T> extends 
AbstractFeedDataFlowController {
     private static final Logger LOGGER = 
Logger.getLogger(FeedRecordDataFlowController.class.getName());
-    protected final IRecordDataParser<T> dataParser;
-    protected final IRecordReader<? extends T> recordReader;
+    private final IRecordDataParser<T> dataParser;
+    private final IRecordReader<? extends T> recordReader;
     protected final AtomicBoolean closed = new AtomicBoolean(false);
     protected static final long INTERVAL = 1000;
-    protected final Object mutex = new Object();
-    protected final boolean sendMarker;
     protected boolean failed = false;
-    private FeedRecordDataFlowController<T>.DataflowMarker dataflowMarker;
-    private Future<?> dataflowMarkerResult;
 
     public FeedRecordDataFlowController(IHyracksTaskContext ctx, 
FeedTupleForwarder tupleForwarder,
             FeedLogManager feedLogManager, int numOfOutputFields, 
IRecordDataParser<T> dataParser,
-            IRecordReader<T> recordReader, boolean sendMarker) throws 
HyracksDataException {
+            IRecordReader<T> recordReader) throws HyracksDataException {
         super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
         this.dataParser = dataParser;
         this.recordReader = recordReader;
-        this.sendMarker = sendMarker;
         recordReader.setFeedLogManager(feedLogManager);
         recordReader.setController(this);
     }
 
     @Override
     public void start(IFrameWriter writer) throws HyracksDataException {
-        startDataflowMarker();
         HyracksDataException hde = null;
         try {
             failed = false;
             tupleForwarder.initialize(ctx, writer);
             while (recordReader.hasNext()) {
-                // synchronized on mutex before we call next() so we don't a 
marker before its record
-                synchronized (mutex) {
-                    IRawRecord<? extends T> record = recordReader.next();
-                    if (record == null) {
-                        flush();
-                        mutex.wait(INTERVAL);
-                        continue;
+                IRawRecord<? extends T> record = recordReader.next();
+                if (record == null) {
+                    flush();
+                    synchronized (this) {
+                        wait(INTERVAL);
                     }
-                    tb.reset();
-                    parseAndForward(record);
+                    continue;
                 }
+                tb.reset();
+                parseAndForward(record);
             }
         } catch (InterruptedException e) {
             //TODO: Find out what could cause an interrupted exception beside 
termination of a job/feed
@@ -95,7 +79,6 @@
             LOGGER.warn("Failure while operating a feed source", e);
             throw new HyracksDataException(e);
         }
-        stopDataflowMarker();
         try {
             tupleForwarder.close();
         } catch (Throwable th) {
@@ -108,9 +91,6 @@
             hde = ExceptionUtils.suppressIntoHyracksDataException(hde, th);
         } finally {
             closeSignal();
-            if (sendMarker && dataflowMarkerResult != null) {
-                dataflowMarkerResult.cancel(true);
-            }
         }
         if (hde != null) {
             throw hde;
@@ -118,41 +98,24 @@
     }
 
     private void parseAndForward(IRawRecord<? extends T> record) throws 
IOException {
-        synchronized (dataParser) {
-            try {
-                dataParser.parse(record, tb.getDataOutput());
-            } catch (Exception e) {
-                LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
-                feedLogManager.logRecord(record.toString(), 
ExternalDataConstants.ERROR_PARSE_RECORD);
-                // continue the outer loop
-                return;
-            }
-            tb.addFieldEndOffset();
-            addMetaPart(tb, record);
-            addPrimaryKeys(tb, record);
-            tupleForwarder.addTuple(tb);
+        try {
+            dataParser.parse(record, tb.getDataOutput());
+        } catch (Exception e) {
+            LOGGER.warn(ExternalDataConstants.ERROR_PARSE_RECORD, e);
+            feedLogManager.logRecord(record.toString(), 
ExternalDataConstants.ERROR_PARSE_RECORD);
+            // continue the outer loop
+            return;
         }
+        tb.addFieldEndOffset();
+        addMetaPart(tb, record);
+        addPrimaryKeys(tb, record);
+        tupleForwarder.addTuple(tb);
     }
 
     protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> 
record) throws IOException {
     }
 
     protected void addPrimaryKeys(ArrayTupleBuilder tb, IRawRecord<? extends 
T> record) throws IOException {
-    }
-
-    private void startDataflowMarker() {
-        ExecutorService executorService = sendMarker ? 
Executors.newSingleThreadExecutor() : null;
-        if (sendMarker && dataflowMarker == null) {
-            dataflowMarker = new 
DataflowMarker(recordReader.getProgressReporter(),
-                    TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, 
ctx));
-            dataflowMarkerResult = executorService.submit(dataflowMarker);
-        }
-    }
-
-    private void stopDataflowMarker() {
-        if (dataflowMarker != null) {
-            dataflowMarker.stop();
-        }
     }
 
     private void closeSignal() {
@@ -172,7 +135,6 @@
 
     @Override
     public boolean stop() throws HyracksDataException {
-        stopDataflowMarker();
         HyracksDataException hde = null;
         if (recordReader.stop()) {
             if (failed) {
@@ -208,52 +170,11 @@
         return recordReader.handleException(th);
     }
 
-    private class DataflowMarker implements Runnable {
-        private final IFeedMarker marker;
-        private final VSizeFrame mark;
-        private volatile boolean stopped = false;
+    public IRecordReader<? extends T> getReader() {
+        return recordReader;
+    }
 
-        public DataflowMarker(IFeedMarker marker, VSizeFrame mark) {
-            this.marker = marker;
-            this.mark = mark;
-        }
-
-        public synchronized void stop() {
-            stopped = true;
-            notify();
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (true) {
-                    synchronized (this) {
-                        if (!stopped) {
-                            // TODO (amoudi): find a better reactive way to do 
this
-                            // sleep for two seconds
-                            wait(TimeUnit.SECONDS.toMillis(2));
-                        } else {
-                            break;
-                        }
-                    }
-                    synchronized (mutex) {
-                        if (marker.mark(mark)) {
-                            // broadcast
-                            tupleForwarder.flush();
-                            // clear
-                            mark.getBuffer().clear();
-                            
mark.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
-                            mark.getBuffer().flip();
-                        }
-                    }
-                }
-            } catch (InterruptedException e) {
-                LOGGER.warn("Marker stopped", e);
-                Thread.currentThread().interrupt();
-                return;
-            } catch (Exception e) {
-                LOGGER.warn("Marker stopped", e);
-            }
-        }
+    public IRecordDataParser<T> getParser() {
+        return dataParser;
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index d31e074..4177ea6 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -28,12 +28,9 @@
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
-import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 
 public class FeedTupleForwarder implements ITupleForwarder {
 
@@ -58,11 +55,6 @@
             this.frame = new VSizeFrame(ctx);
             this.writer = writer;
             this.appender = new FrameTupleAppender(frame);
-            // Set null feed message
-            VSizeFrame message = TaskUtil.<VSizeFrame> 
get(HyracksConstants.KEY_MESSAGE, ctx);
-            // a null message
-            
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
-            message.getBuffer().flip();
             initialized = true;
         }
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index 45ae52b..c7f6d9c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -28,15 +28,18 @@
 
 public class FeedWithMetaDataFlowController<T> extends 
FeedRecordDataFlowController<T> {
 
+    protected final IRecordWithMetadataParser<T> dataParser;
+
     public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, 
FeedTupleForwarder tupleForwarder,
             FeedLogManager feedLogManager, int numOfOutputFields, 
IRecordWithMetadataParser<T> dataParser,
-            IRecordReader<T> recordReader, boolean sendMarker) throws 
HyracksDataException {
-        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, 
dataParser, recordReader, sendMarker);
+            IRecordReader<T> recordReader) throws HyracksDataException {
+        super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, 
dataParser, recordReader);
+        this.dataParser = dataParser;
     }
 
     @Override
     protected void addMetaPart(ArrayTupleBuilder tb, IRawRecord<? extends T> 
record) throws HyracksDataException {
-        ((IRecordWithMetadataParser<T>) 
dataParser).parseMeta(tb.getDataOutput());
+        dataParser.parseMeta(tb.getDataOutput());
         tb.addFieldEndOffset();
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
index 4649559..9b23e38 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/stream/StreamRecordReaderFactory.java
@@ -52,8 +52,7 @@
     }
 
     @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint()
-            throws HyracksDataException, AlgebricksException {
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() 
throws AlgebricksException {
         return streamFactory.getPartitionConstraint();
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
index 99fff19..546946a 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedIntakeOperatorNodePushable.java
@@ -27,11 +27,13 @@
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
 import org.apache.asterix.external.feed.runtime.AdapterRuntimeManager;
+import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
 import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 
 /**
@@ -63,7 +65,12 @@
             Thread.currentThread().setName("Intake Thread");
             FeedAdapter adapter = (FeedAdapter) 
adapterFactory.createAdapter(ctx, partition);
             adapterRuntimeManager = new AdapterRuntimeManager(ctx, 
runtimeId.getEntityId(), adapter, writer, partition);
-            TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, new 
VSizeFrame(ctx), ctx);
+            IFrame message = new VSizeFrame(ctx);
+            TaskUtil.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, 
ctx);
+            // Set null feed message
+            // a null message
+            
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+            message.getBuffer().flip();
             adapterRuntimeManager.start();
             synchronized (adapterRuntimeManager) {
                 while (!adapterRuntimeManager.isDone()) {
@@ -82,7 +89,7 @@
              */
             throw new HyracksDataException(ie);
         } finally {
-                writer.close();
+            writer.close();
         }
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index a369fe3..94bcdce 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -61,7 +61,7 @@
     public static IDataFlowController getDataflowController(ARecordType 
recordType, IHyracksTaskContext ctx,
             int partition, IExternalDataSourceFactory dataSourceFactory, 
IDataParserFactory dataParserFactory,
             Map<String, String> configuration, boolean indexingOp, boolean 
isFeed, FeedLogManager feedLogManager)
-            throws HyracksDataException {
+                    throws HyracksDataException {
         try {
             switch (dataSourceFactory.getDataSourceType()) {
                 case RECORDS:
@@ -69,7 +69,6 @@
                     IRecordReader<?> recordReader = 
recordReaderFactory.createRecordReader(ctx, partition);
                     IRecordDataParserFactory<?> recordParserFactory = 
(IRecordDataParserFactory<?>) dataParserFactory;
                     IRecordDataParser<?> dataParser = 
recordParserFactory.createRecordParser(ctx);
-                    boolean sendMarker = 
ExternalDataUtils.isSendMarker(configuration);
                     if (indexingOp) {
                         return new IndexingDataFlowController(ctx,
                                 DataflowUtils.getTupleForwarder(configuration, 
feedLogManager), dataParser,
@@ -83,19 +82,18 @@
                             if (isChangeFeed) {
                                 int numOfKeys = 
ExternalDataUtils.getNumberOfKeys(configuration);
                                 return new 
ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
-                                        numOfKeys + 2, 
(IRecordWithMetadataParser) dataParser, recordReader,
-                                        sendMarker);
+                                        numOfKeys + 2, 
(IRecordWithMetadataParser) dataParser, recordReader);
                             } else {
                                 return new FeedWithMetaDataFlowController(ctx, 
tupleForwarder, feedLogManager, 2,
-                                        (IRecordWithMetadataParser) 
dataParser, recordReader, sendMarker);
+                                        (IRecordWithMetadataParser) 
dataParser, recordReader);
                             }
                         } else if (isChangeFeed) {
                             int numOfKeys = 
ExternalDataUtils.getNumberOfKeys(configuration);
                             return new ChangeFeedDataFlowController(ctx, 
tupleForwarder, feedLogManager, numOfKeys + 1,
-                                    (IRecordWithPKDataParser) dataParser, 
recordReader, sendMarker);
+                                    (IRecordWithPKDataParser) dataParser, 
recordReader);
                         } else {
                             return new FeedRecordDataFlowController(ctx, 
tupleForwarder, feedLogManager, 1, dataParser,
-                                    recordReader, sendMarker);
+                                    recordReader);
                         }
                     } else {
                         return new RecordDataFlowController(ctx,
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index a89d13e..3b6e7ff 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -41,8 +41,6 @@
     public static final String KEY_FILESYSTEM = "fs";
     // specifies the address of the HDFS name node
     public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
-    // specifies whether a feed sends progress markers or not
-    public static final String KEY_SEND_MARKER = "send-marker";
     // specifies the class implementation of the accessed instance of HDFS
     public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
     public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index d009960..88d00d0 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -162,7 +162,7 @@
     private static Map<ATypeTag, IValueParserFactory> valueParserFactoryMap = 
initializeValueParserFactoryMap();
 
     private static Map<ATypeTag, IValueParserFactory> 
initializeValueParserFactoryMap() {
-        Map<ATypeTag, IValueParserFactory> m = new HashMap<ATypeTag, 
IValueParserFactory>();
+        Map<ATypeTag, IValueParserFactory> m = new HashMap<>();
         m.put(ATypeTag.INT32, IntegerParserFactory.INSTANCE);
         m.put(ATypeTag.FLOAT, FloatParserFactory.INSTANCE);
         m.put(ATypeTag.DOUBLE, DoubleParserFactory.INSTANCE);
@@ -338,9 +338,5 @@
             intIndicators[i] = Integer.parseInt(stringIndicators[i]);
         }
         return intIndicators;
-    }
-
-    public static boolean isSendMarker(Map<String, String> configuration) {
-        return 
Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_SEND_MARKER));
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
index d286ff9..f796a73 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapterFactory.java
@@ -81,8 +81,8 @@
                 IPropertiesProvider propertiesProvider =
                         (IPropertiesProvider) ((NodeControllerService) ctx
                                 
.getJobletContext().getApplicationContext().getControllerService())
-                                        .getApplicationContext()
-                                        .getApplicationObject();
+                        .getApplicationContext()
+                        .getApplicationObject();
                 ClusterPartition nodePartition = 
propertiesProvider.getMetadataProperties().getNodePartitions()
                         .get(nodeId)[0];
                 parser = new ADMDataParser(outputType, true);
@@ -145,4 +145,9 @@
         return null;
     }
 
+    @Override
+    public IExternalDataSourceFactory getDataSourceFactory() {
+        return null;
+    }
+
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f5c6d9a..8f80416 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -404,7 +404,7 @@
 
     protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildLoadableDatasetScan(
             JobSpecification jobSpec, IAdapterFactory adapterFactory, 
RecordDescriptor rDesc)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         ExternalScanOperatorDescriptor dataScanner =
                 new ExternalScanOperatorDescriptor(jobSpec, rDesc, 
adapterFactory);
         AlgebricksPartitionConstraint constraint;
@@ -447,7 +447,7 @@
                 break;
             case EXTERNAL:
                 String libraryName = primaryFeed.getAdapterName().trim()
-                        
.split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
+                
.split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0];
                 feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, 
primaryFeed, libraryName,
                         adapterFactory.getClass().getName(), recordType, 
policyAccessor, factoryOutput.second);
                 break;
@@ -754,36 +754,36 @@
             String indexName = primaryIndex.getIndexName();
             ARecordType metaType = dataset.hasMetaPart()
                     ? (ARecordType) 
findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName())
-                    : null;
-            String itemTypeName = dataset.getItemTypeName();
-            ARecordType itemType = (ARecordType) MetadataManager.INSTANCE
-                    .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), 
itemTypeName).getDatatype();
-            ITypeTraits[] typeTraits = 
DatasetUtil.computeTupleTypeTraits(dataset, itemType, null);
-            IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
-                    itemType, metaType, 
context.getBinaryComparatorFactoryProvider());
+                            : null;
+                    String itemTypeName = dataset.getItemTypeName();
+                    ARecordType itemType = (ARecordType) 
MetadataManager.INSTANCE
+                            .getDatatype(mdTxnCtx, 
dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
+                    ITypeTraits[] typeTraits = 
DatasetUtil.computeTupleTypeTraits(dataset, itemType, null);
+                    IBinaryComparatorFactory[] comparatorFactories = 
DatasetUtil.computeKeysBinaryComparatorFactories(dataset,
+                            itemType, metaType, 
context.getBinaryComparatorFactoryProvider());
 
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                    
getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), 
datasetName, indexName,
-                            temp);
-            IApplicationContextInfo appContext = (IApplicationContextInfo) 
context.getAppContext();
-            long numElementsHint = getCardinalityPerPartitionHint(dataset);
+                    Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
+                            
getSplitProviderAndConstraints(dataSource.getId().getDataverseName(), 
datasetName, indexName,
+                                    temp);
+                    IApplicationContextInfo appContext = 
(IApplicationContextInfo) context.getAppContext();
+                    long numElementsHint = 
getCardinalityPerPartitionHint(dataset);
 
-            // TODO
-            // figure out the right behavior of the bulkload and then give the
-            // right callback
-            // (ex. what's the expected behavior when there is an error during
-            // bulkload?)
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad =
-                    new TreeIndexBulkLoadOperatorDescriptor(spec, null, 
appContext.getStorageManager(),
-                            appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                            comparatorFactories, bloomFilterKeyFields, 
fieldPermutation,
-                            GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false,
-                            numElementsHint, true, 
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
-                                    metaType, compactionInfo.first, 
compactionInfo.second),
-                            metadataPageManagerFactory);
-            return new Pair<>(btreeBulkLoad, splitsAndConstraint.second);
+                    // TODO
+                    // figure out the right behavior of the bulkload and then 
give the
+                    // right callback
+                    // (ex. what's the expected behavior when there is an 
error during
+                    // bulkload?)
+                    Pair<ILSMMergePolicyFactory, Map<String, String>> 
compactionInfo =
+                            DatasetUtil.getMergePolicyFactory(dataset, 
mdTxnCtx);
+                    TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad =
+                            new TreeIndexBulkLoadOperatorDescriptor(spec, 
null, appContext.getStorageManager(),
+                                    
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, 
typeTraits,
+                                    comparatorFactories, bloomFilterKeyFields, 
fieldPermutation,
+                                    GlobalConfig.DEFAULT_TREE_FILL_FACTOR, 
false,
+                                    numElementsHint, true, 
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
+                                            metaType, compactionInfo.first, 
compactionInfo.second),
+                                    metadataPageManagerFactory);
+                    return new Pair<>(btreeBulkLoad, 
splitsAndConstraint.second);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
         }
@@ -826,7 +826,7 @@
             IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, 
List<LogicalVariable> primaryKeys,
             List<LogicalVariable> secondaryKeys, List<LogicalVariable> 
additionalNonKeyFields,
             ILogicalExpression filterExpr, RecordDescriptor recordDesc, 
JobGenContext context, JobSpecification spec)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, 
dataSourceIndex, propagatedSchema,
                 inputSchemas, typeEnv, primaryKeys, secondaryKeys, 
additionalNonKeyFields, filterExpr, recordDesc,
                 context, spec, false, null, null);
@@ -986,7 +986,7 @@
 
     public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitProviderAndPartitionConstraintsForFilesIndex(
             String dataverseName, String datasetName, String targetIdxName, 
boolean create)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         return 
SplitsAndConstraintsUtil.getFilesIndexSplitProviderAndConstraints(mdTxnCtx, 
dataverseName, datasetName,
                 targetIdxName, create);
     }
@@ -1004,9 +1004,9 @@
             if (dataset.hasMetaPart()) {
                 metaType =
                         (ARecordType) MetadataManager.INSTANCE
-                                
.getDatatype(metadataProvider.getMetadataTxnContext(),
-                                        
dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName())
-                                .getDatatype();
+                        .getDatatype(metadataProvider.getMetadataTxnContext(),
+                                dataset.getMetaItemTypeDataverseName(), 
dataset.getMetaItemTypeName())
+                        .getDatatype();
             }
             ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) 
dataset.getDatasetDetails();
             LookupAdapterFactory<?> adapterFactory =
@@ -1117,64 +1117,64 @@
             IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
                     ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             primaryKeyFields, txnSubsystemProvider, 
IndexOperation.UPSERT, ResourceType.LSM_BTREE)
-                    : new UpsertOperationCallbackFactory(jobId, datasetId, 
primaryKeyFields, txnSubsystemProvider,
-                            IndexOperation.UPSERT, ResourceType.LSM_BTREE, 
dataset.hasMetaPart());
+                            : new UpsertOperationCallbackFactory(jobId, 
datasetId, primaryKeyFields, txnSubsystemProvider,
+                                    IndexOperation.UPSERT, 
ResourceType.LSM_BTREE, dataset.hasMetaPart());
 
-            LockThenSearchOperationCallbackFactory searchCallbackFactory = new 
LockThenSearchOperationCallbackFactory(
-                    jobId, datasetId, primaryKeyFields, txnSubsystemProvider, 
ResourceType.LSM_BTREE);
+                    LockThenSearchOperationCallbackFactory 
searchCallbackFactory = new LockThenSearchOperationCallbackFactory(
+                            jobId, datasetId, primaryKeyFields, 
txnSubsystemProvider, ResourceType.LSM_BTREE);
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = 
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
-                    metaItemType, compactionInfo.first, compactionInfo.second);
-            LSMTreeUpsertOperatorDescriptor op;
+                    Pair<ILSMMergePolicyFactory, Map<String, String>> 
compactionInfo =
+                            DatasetUtil.getMergePolicyFactory(dataset, 
mdTxnCtx);
+                    IIndexDataflowHelperFactory idfh = 
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
+                            metaItemType, compactionInfo.first, 
compactionInfo.second);
+                    LSMTreeUpsertOperatorDescriptor op;
 
-            ITypeTraits[] outputTypeTraits =
-                    new ITypeTraits[recordDesc.getFieldCount() + 
(dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
-            ISerializerDeserializer[] outputSerDes = new 
ISerializerDeserializer[recordDesc.getFieldCount()
-                    + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+                    ITypeTraits[] outputTypeTraits =
+                            new ITypeTraits[recordDesc.getFieldCount() + 
(dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+                    ISerializerDeserializer[] outputSerDes = new 
ISerializerDeserializer[recordDesc.getFieldCount()
+                                                                               
          + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
 
-            // add the previous record first
-            int f = 0;
-            outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
-            f++;
-            // add the previous meta second
-            if (dataset.hasMetaPart()) {
-                outputSerDes[f] =
-                        
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
-                outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
-                f++;
-            }
-            // add the previous filter third
-            int fieldIdx = -1;
-            if (numFilterFields > 0) {
-                String filterField = 
DatasetUtil.getFilterField(dataset).get(0);
-                for (i = 0; i < itemType.getFieldNames().length; i++) {
-                    if (itemType.getFieldNames()[i].equals(filterField)) {
-                        break;
+                    // add the previous record first
+                    int f = 0;
+                    outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+                    f++;
+                    // add the previous meta second
+                    if (dataset.hasMetaPart()) {
+                        outputSerDes[f] =
+                                
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
+                        outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+                        f++;
                     }
-                }
-                fieldIdx = i;
-                outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider()
-                        .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
-                outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider()
-                        
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
-                f++;
-            }
-            for (int j = 0; j < recordDesc.getFieldCount(); j++) {
-                outputTypeTraits[j + f] = recordDesc.getTypeTraits()[j];
-                outputSerDes[j + f] = recordDesc.getFields()[j];
-            }
+                    // add the previous filter third
+                    int fieldIdx = -1;
+                    if (numFilterFields > 0) {
+                        String filterField = 
DatasetUtil.getFilterField(dataset).get(0);
+                        for (i = 0; i < itemType.getFieldNames().length; i++) {
+                            if 
(itemType.getFieldNames()[i].equals(filterField)) {
+                                break;
+                            }
+                        }
+                        fieldIdx = i;
+                        outputTypeTraits[f] = 
FormatUtils.getDefaultFormat().getTypeTraitProvider()
+                                
.getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+                        outputSerDes[f] = 
FormatUtils.getDefaultFormat().getSerdeProvider()
+                                
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+                        f++;
+                    }
+                    for (int j = 0; j < recordDesc.getFieldCount(); j++) {
+                        outputTypeTraits[j + f] = 
recordDesc.getTypeTraits()[j];
+                        outputSerDes[j + f] = recordDesc.getFields()[j];
+                    }
 
-            RecordDescriptor outputRecordDesc = new 
RecordDescriptor(outputSerDes, outputTypeTraits);
-            op = new LSMTreeUpsertOperatorDescriptor(spec, outputRecordDesc, 
appContext.getStorageManager(),
-                    appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                    comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, idfh, null, true, indexName,
-                    context.getMissingWriterFactory(), 
modificationCallbackFactory, searchCallbackFactory, null,
-                    metadataPageManagerFactory);
-            op.setType(itemType);
-            op.setFilterIndex(fieldIdx);
-            return new Pair<>(op, splitsAndConstraint.second);
+                    RecordDescriptor outputRecordDesc = new 
RecordDescriptor(outputSerDes, outputTypeTraits);
+                    op = new LSMTreeUpsertOperatorDescriptor(spec, 
outputRecordDesc, appContext.getStorageManager(),
+                            appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
+                            comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, idfh, null, true, indexName,
+                            context.getMissingWriterFactory(), 
modificationCallbackFactory, searchCallbackFactory, null,
+                            metadataPageManagerFactory);
+                    op.setType(itemType);
+                    op.setFilterIndex(fieldIdx);
+                    return new Pair<>(op, splitsAndConstraint.second);
 
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
@@ -1183,7 +1183,7 @@
 
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildExternalDatasetDataScannerRuntime(
             JobSpecification jobSpec, IAType itemType, IAdapterFactory 
adapterFactory, IDataFormat format)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         if (itemType.getTypeTag() != ATypeTag.RECORD) {
             throw new AlgebricksException("Can only scan datasets of 
records.");
         }
@@ -1233,7 +1233,7 @@
                 switch (dsType) {
                     case INTERNAL:
                         keyType = (hasMeta && 
primaryIndexKeyIndicators.get(j).intValue() == 1)
-                                ? 
metaType.getSubFieldType(pidxKeyFieldNames.get(j))
+                        ? metaType.getSubFieldType(pidxKeyFieldNames.get(j))
                                 : 
recType.getSubFieldType(pidxKeyFieldNames.get(j));
                         break;
                     case EXTERNAL:
@@ -1269,7 +1269,7 @@
         int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
         // Move key fields to front.
         int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
-                + (additionalNonFilteringFields == null ? 0 : 
additionalNonFilteringFields.size())];
+                                         + (additionalNonFilteringFields == 
null ? 0 : additionalNonFilteringFields.size())];
         int[] bloomFilterKeyFields = new int[numKeys];
         int i = 0;
         for (LogicalVariable varKey : keys) {
@@ -1317,31 +1317,31 @@
                     ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(
                             ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
                             primaryKeyFields, txnSubsystemProvider, indexOp, 
ResourceType.LSM_BTREE)
-                    : new PrimaryIndexModificationOperationCallbackFactory(
-                            ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
-                            primaryKeyFields, txnSubsystemProvider, indexOp, 
ResourceType.LSM_BTREE,
-                            dataset.hasMetaPart());
+                            : new 
PrimaryIndexModificationOperationCallbackFactory(
+                                    ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
+                                    primaryKeyFields, txnSubsystemProvider, 
indexOp, ResourceType.LSM_BTREE,
+                                    dataset.hasMetaPart());
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = 
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
-                    metaItemType, compactionInfo.first, compactionInfo.second);
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, 
fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, true, 
numElementsHint, true, idfh,
-                        metadataPageManagerFactory);
-            } else {
-                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, indexOp, idfh, null, true,
-                        indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
-                        metadataPageManagerFactory);
-            }
-            return new Pair<>(op, splitsAndConstraint.second);
+                            Pair<ILSMMergePolicyFactory, Map<String, String>> 
compactionInfo =
+                                    DatasetUtil.getMergePolicyFactory(dataset, 
mdTxnCtx);
+                            IIndexDataflowHelperFactory idfh = 
dataset.getIndexDataflowHelperFactory(this, primaryIndex, itemType,
+                                    metaItemType, compactionInfo.first, 
compactionInfo.second);
+                            IOperatorDescriptor op;
+                            if (bulkload) {
+                                long numElementsHint = 
getCardinalityPerPartitionHint(dataset);
+                                op = new 
TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
+                                        
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, 
typeTraits,
+                                        comparatorFactories, 
bloomFilterKeyFields, fieldPermutation,
+                                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, 
true, numElementsHint, true, idfh,
+                                        metadataPageManagerFactory);
+                            } else {
+                                op = new 
LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
+                                        
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, 
typeTraits,
+                                        comparatorFactories, 
bloomFilterKeyFields, fieldPermutation, indexOp, idfh, null, true,
+                                        indexName, null, 
modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+                                        metadataPageManagerFactory);
+                            }
+                            return new Pair<>(op, splitsAndConstraint.second);
         } catch (MetadataException me) {
             throw new AlgebricksException(me);
         }
@@ -1465,86 +1465,86 @@
             ARecordType metaType = dataset.hasMetaPart()
                     ? (ARecordType) 
MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
                             dataset.getMetaItemTypeDataverseName(), 
dataset.getMetaItemTypeName()).getDatatype()
-                    : null;
+                            : null;
 
-            // Index parameters.
-            Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
-                    dataset.getDatasetName(), indexName);
+                            // Index parameters.
+                            Index secondaryIndex = 
MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
+                                    dataset.getDatasetName(), indexName);
 
-            ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(dataset, itemType);
-            int[] filterFields;
-            int[] btreeFields;
-            if (filterTypeTraits != null) {
-                filterFields = new int[1];
-                filterFields[0] = numKeys;
-                btreeFields = new int[numKeys];
-                for (int k = 0; k < btreeFields.length; k++) {
-                    btreeFields[k] = k;
-                }
-            }
+                            ITypeTraits[] filterTypeTraits = 
DatasetUtil.computeFilterTypeTraits(dataset, itemType);
+                            int[] filterFields;
+                            int[] btreeFields;
+                            if (filterTypeTraits != null) {
+                                filterFields = new int[1];
+                                filterFields[0] = numKeys;
+                                btreeFields = new int[numKeys];
+                                for (int k = 0; k < btreeFields.length; k++) {
+                                    btreeFields[k] = k;
+                                }
+                            }
 
-            List<List<String>> secondaryKeyNames = 
secondaryIndex.getKeyFieldNames();
-            List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes();
-            ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
-            IBinaryComparatorFactory[] comparatorFactories = new 
IBinaryComparatorFactory[numKeys];
-            for (i = 0; i < secondaryKeys.size(); ++i) {
-                Pair<IAType, Boolean> keyPairType = 
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
-                        secondaryKeyNames.get(i), itemType);
-                IAType keyType = keyPairType.first;
-                comparatorFactories[i] =
-                        
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, 
true);
-                typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-            }
-            List<List<String>> partitioningKeys = 
DatasetUtil.getPartitioningKeys(dataset);
-            for (List<String> partitioningKey : partitioningKeys) {
-                IAType keyType = itemType.getSubFieldType(partitioningKey);
-                comparatorFactories[i] =
-                        
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, 
true);
-                typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
-                ++i;
-            }
+                            List<List<String>> secondaryKeyNames = 
secondaryIndex.getKeyFieldNames();
+                            List<IAType> secondaryKeyTypes = 
secondaryIndex.getKeyFieldTypes();
+                            ITypeTraits[] typeTraits = new 
ITypeTraits[numKeys];
+                            IBinaryComparatorFactory[] comparatorFactories = 
new IBinaryComparatorFactory[numKeys];
+                            for (i = 0; i < secondaryKeys.size(); ++i) {
+                                Pair<IAType, Boolean> keyPairType = 
Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(i),
+                                        secondaryKeyNames.get(i), itemType);
+                                IAType keyType = keyPairType.first;
+                                comparatorFactories[i] =
+                                        
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, 
true);
+                                typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+                            }
+                            List<List<String>> partitioningKeys = 
DatasetUtil.getPartitioningKeys(dataset);
+                            for (List<String> partitioningKey : 
partitioningKeys) {
+                                IAType keyType = 
itemType.getSubFieldType(partitioningKey);
+                                comparatorFactories[i] =
+                                        
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, 
true);
+                                typeTraits[i] = 
TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
+                                ++i;
+                            }
 
-            IApplicationContextInfo appContext = (IApplicationContextInfo) 
context.getAppContext();
-            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
-                    getSplitProviderAndConstraints(dataverseName, datasetName, 
indexName, temp);
+                            IApplicationContextInfo appContext = 
(IApplicationContextInfo) context.getAppContext();
+                            Pair<IFileSplitProvider, 
AlgebricksPartitionConstraint> splitsAndConstraint =
+                                    
getSplitProviderAndConstraints(dataverseName, datasetName, indexName, temp);
 
-            // prepare callback
-            JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
-                    ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_BTREE)
-                    : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_BTREE, dataset.hasMetaPart());
+                            // prepare callback
+                            JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
+                            int datasetId = dataset.getDatasetId();
+                            IModificationOperationCallbackFactory 
modificationCallbackFactory = temp
+                                    ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                                            
modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+                                            ResourceType.LSM_BTREE)
+                                            : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                                                    
modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+                                                    ResourceType.LSM_BTREE, 
dataset.hasMetaPart());
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory idfh = 
dataset.getIndexDataflowHelperFactory(this, secondaryIndex, itemType,
-                    metaType, compactionInfo.first, compactionInfo.second);
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, 
fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false, idfh,
-                        metadataPageManagerFactory);
-            } else if (indexOp == IndexOperation.UPSERT) {
-                op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, idfh, filterFactory, false,
-                        indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
-                        prevFieldPermutation, metadataPageManagerFactory);
-            } else {
-                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, indexOp, idfh, filterFactory,
-                        false, indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
-                        metadataPageManagerFactory);
-            }
-            return new Pair<>(op, splitsAndConstraint.second);
+                                    Pair<ILSMMergePolicyFactory, Map<String, 
String>> compactionInfo =
+                                            
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+                                    IIndexDataflowHelperFactory idfh = 
dataset.getIndexDataflowHelperFactory(this, secondaryIndex, itemType,
+                                            metaType, compactionInfo.first, 
compactionInfo.second);
+                                    IOperatorDescriptor op;
+                                    if (bulkload) {
+                                        long numElementsHint = 
getCardinalityPerPartitionHint(dataset);
+                                        op = new 
TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
+                                                
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, 
typeTraits,
+                                                comparatorFactories, 
bloomFilterKeyFields, fieldPermutation,
+                                                
GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh,
+                                                metadataPageManagerFactory);
+                                    } else if (indexOp == 
IndexOperation.UPSERT) {
+                                        op = new 
LSMTreeUpsertOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
+                                                
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, 
typeTraits,
+                                                comparatorFactories, 
bloomFilterKeyFields, fieldPermutation, idfh, filterFactory, false,
+                                                indexName, null, 
modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+                                                prevFieldPermutation, 
metadataPageManagerFactory);
+                                    } else {
+                                        op = new 
LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
+                                                
appContext.getIndexLifecycleManagerProvider(), splitsAndConstraint.first, 
typeTraits,
+                                                comparatorFactories, 
bloomFilterKeyFields, fieldPermutation, indexOp, idfh, filterFactory,
+                                                false, indexName, null, 
modificationCallbackFactory, NoOpOperationCallbackFactory.INSTANCE,
+                                                metadataPageManagerFactory);
+                                    }
+                                    return new Pair<>(op, 
splitsAndConstraint.second);
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
@@ -1673,36 +1673,36 @@
                     ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
                             ResourceType.LSM_RTREE)
-                    : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_RTREE, dataset.hasMetaPart());
+                            : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                                    modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
+                                    ResourceType.LSM_RTREE, 
dataset.hasMetaPart());
 
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(this,
-                    secondaryIndex, recType, metaItemType, 
compactionInfo.first, compactionInfo.second);
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new TreeIndexBulkLoadOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        primaryComparatorFactories, btreeFields, 
fieldPermutation,
-                        GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false,
-                        indexDataflowHelperFactory, 
metadataPageManagerFactory);
-            } else if (indexOp == IndexOperation.UPSERT) {
-                op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, null, fieldPermutation, 
indexDataflowHelperFactory, filterFactory, false,
-                        indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
-                        prevFieldPermutation, metadataPageManagerFactory);
-            } else {
-                op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
-                        appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
-                        comparatorFactories, null, fieldPermutation, indexOp, 
indexDataflowHelperFactory,
-                        filterFactory, false, indexName, null, 
modificationCallbackFactory,
-                        NoOpOperationCallbackFactory.INSTANCE, 
metadataPageManagerFactory);
-            }
-            return new Pair<>(op, splitsAndConstraint.second);
+                    Pair<ILSMMergePolicyFactory, Map<String, String>> 
compactionInfo =
+                            DatasetUtil.getMergePolicyFactory(dataset, 
mdTxnCtx);
+                    IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(this,
+                            secondaryIndex, recType, metaItemType, 
compactionInfo.first, compactionInfo.second);
+                    IOperatorDescriptor op;
+                    if (bulkload) {
+                        long numElementsHint = 
getCardinalityPerPartitionHint(dataset);
+                        op = new TreeIndexBulkLoadOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
+                                appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
+                                primaryComparatorFactories, btreeFields, 
fieldPermutation,
+                                GlobalConfig.DEFAULT_TREE_FILL_FACTOR, false, 
numElementsHint, false,
+                                indexDataflowHelperFactory, 
metadataPageManagerFactory);
+                    } else if (indexOp == IndexOperation.UPSERT) {
+                        op = new LSMTreeUpsertOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
+                                appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
+                                comparatorFactories, null, fieldPermutation, 
indexDataflowHelperFactory, filterFactory, false,
+                                indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
+                                prevFieldPermutation, 
metadataPageManagerFactory);
+                    } else {
+                        op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
+                                appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
+                                comparatorFactories, null, fieldPermutation, 
indexOp, indexDataflowHelperFactory,
+                                filterFactory, false, indexName, null, 
modificationCallbackFactory,
+                                NoOpOperationCallbackFactory.INSTANCE, 
metadataPageManagerFactory);
+                    }
+                    return new Pair<>(op, splitsAndConstraint.second);
         } catch (MetadataException e) {
             throw new AlgebricksException(e);
         }
@@ -1714,7 +1714,7 @@
             AsterixTupleFilterFactory filterFactory, RecordDescriptor 
recordDesc, JobGenContext context,
             JobSpecification spec, IndexOperation indexOp, IndexType 
indexType, boolean bulkload,
             List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> 
prevAdditionalFilteringKeys)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         // Check the index is length-partitioned or not.
         boolean isPartitioned;
         if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
@@ -1883,36 +1883,36 @@
                     ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
                             modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
                             ResourceType.LSM_INVERTED_INDEX)
-                    : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
-                            ResourceType.LSM_INVERTED_INDEX, 
dataset.hasMetaPart());
-            Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
-                    DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            IIndexDataflowHelperFactory indexDataFlowFactory = 
dataset.getIndexDataflowHelperFactory(this,
-                    secondaryIndex, recType, metaItemType, 
compactionInfo.first, compactionInfo.second);
-            IOperatorDescriptor op;
-            if (bulkload) {
-                long numElementsHint = getCardinalityPerPartitionHint(dataset);
-                op = new LSMInvertedIndexBulkLoadOperatorDescriptor(spec, 
recordDesc, fieldPermutation, false,
-                        numElementsHint, false, 
appContext.getStorageManager(), splitsAndConstraint.first,
-                        appContext.getIndexLifecycleManagerProvider(), 
tokenTypeTraits, tokenComparatorFactories,
-                        invListsTypeTraits, invListComparatorFactories, 
tokenizerFactory, indexDataFlowFactory,
-                        metadataPageManagerFactory);
-            } else if (indexOp == IndexOperation.UPSERT) {
-                op = new LSMInvertedIndexUpsertOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
-                        splitsAndConstraint.first, 
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits,
-                        tokenComparatorFactories, invListsTypeTraits, 
invListComparatorFactories, tokenizerFactory,
-                        fieldPermutation, indexDataFlowFactory, filterFactory, 
modificationCallbackFactory, indexName,
-                        prevFieldPermutation, metadataPageManagerFactory);
-            } else {
-                op = new LSMInvertedIndexInsertDeleteOperatorDescriptor(spec, 
recordDesc,
-                        appContext.getStorageManager(), 
splitsAndConstraint.first,
-                        appContext.getIndexLifecycleManagerProvider(), 
tokenTypeTraits, tokenComparatorFactories,
-                        invListsTypeTraits, invListComparatorFactories, 
tokenizerFactory, fieldPermutation, indexOp,
-                        indexDataFlowFactory, filterFactory, 
modificationCallbackFactory, indexName,
-                        metadataPageManagerFactory);
-            }
-            return new Pair<>(op, splitsAndConstraint.second);
+                            : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                                    modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, indexOp,
+                                    ResourceType.LSM_INVERTED_INDEX, 
dataset.hasMetaPart());
+                    Pair<ILSMMergePolicyFactory, Map<String, String>> 
compactionInfo =
+                            DatasetUtil.getMergePolicyFactory(dataset, 
mdTxnCtx);
+                    IIndexDataflowHelperFactory indexDataFlowFactory = 
dataset.getIndexDataflowHelperFactory(this,
+                            secondaryIndex, recType, metaItemType, 
compactionInfo.first, compactionInfo.second);
+                    IOperatorDescriptor op;
+                    if (bulkload) {
+                        long numElementsHint = 
getCardinalityPerPartitionHint(dataset);
+                        op = new 
LSMInvertedIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, 
false,
+                                numElementsHint, false, 
appContext.getStorageManager(), splitsAndConstraint.first,
+                                appContext.getIndexLifecycleManagerProvider(), 
tokenTypeTraits, tokenComparatorFactories,
+                                invListsTypeTraits, 
invListComparatorFactories, tokenizerFactory, indexDataFlowFactory,
+                                metadataPageManagerFactory);
+                    } else if (indexOp == IndexOperation.UPSERT) {
+                        op = new 
LSMInvertedIndexUpsertOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
+                                splitsAndConstraint.first, 
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits,
+                                tokenComparatorFactories, invListsTypeTraits, 
invListComparatorFactories, tokenizerFactory,
+                                fieldPermutation, indexDataFlowFactory, 
filterFactory, modificationCallbackFactory, indexName,
+                                prevFieldPermutation, 
metadataPageManagerFactory);
+                    } else {
+                        op = new 
LSMInvertedIndexInsertDeleteOperatorDescriptor(spec, recordDesc,
+                                appContext.getStorageManager(), 
splitsAndConstraint.first,
+                                appContext.getIndexLifecycleManagerProvider(), 
tokenTypeTraits, tokenComparatorFactories,
+                                invListsTypeTraits, 
invListComparatorFactories, tokenizerFactory, fieldPermutation, indexOp,
+                                indexDataFlowFactory, filterFactory, 
modificationCallbackFactory, indexName,
+                                metadataPageManagerFactory);
+                    }
+                    return new Pair<>(op, splitsAndConstraint.second);
         } catch (Exception e) {
             throw new AlgebricksException(e);
         }
@@ -2105,7 +2105,7 @@
 
     private AsterixTupleFilterFactory 
createTupleFilterFactory(IOperatorSchema[] inputSchemas,
             IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, 
JobGenContext context)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         // No filtering condition.
         if (filterExpr == null) {
             return null;
@@ -2125,4 +2125,16 @@
     public IStorageComponentProvider getStorageComponentProvider() {
         return storaegComponentProvider;
     }
+
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
getSplitProviderAndConstraints(Dataset ds)
+            throws AlgebricksException {
+        return getSplitProviderAndConstraints(ds.getDataverseName(), 
ds.getDatasetName(), ds.getDatasetName(),
+                ds.getDatasetDetails().isTemp());
+    }
+
+    public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
getSplitProviderAndConstraints(Dataset ds,
+            Index index) throws AlgebricksException {
+        return getSplitProviderAndConstraints(ds.getDataverseName(), 
ds.getDatasetName(), index.getIndexName(),
+                ds.getDatasetDetails().isTemp());
+    }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 2e328f9..c72129f 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -29,7 +29,6 @@
 import org.apache.asterix.active.ActiveJobNotificationHandler;
 import org.apache.asterix.active.IActiveEntityEventsListener;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
@@ -37,6 +36,7 @@
 import 
org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
 import 
org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
 import 
org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.utils.JobUtils;
@@ -433,9 +433,9 @@
         switch (index.getIndexType()) {
             case BTREE:
                 return getDatasetType() == DatasetType.EXTERNAL
-                        && 
!index.getIndexName().equals(BTreeDataflowHelperFactoryProvider.externalFileIndexName(this))
-                                ? 
LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE
-                                : LSMBTreeIOOperationCallbackFactory.INSTANCE;
+                && 
!index.getIndexName().equals(BTreeDataflowHelperFactoryProvider.externalFileIndexName(this))
+                ? LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE
+                        : LSMBTreeIOOperationCallbackFactory.INSTANCE;
             case RTREE:
                 return LSMRTreeIOOperationCallbackFactory.INSTANCE;
             case LENGTH_PARTITIONED_NGRAM_INVIX:
@@ -489,8 +489,8 @@
             return (op == IndexOperation.UPSERT)
                     ? new LockThenSearchOperationCallbackFactory(jobId, 
getDatasetId(), primaryKeyFields,
                             
storageComponentProvider.getTransactionSubsystemProvider(), 
ResourceType.LSM_BTREE)
-                    : new 
PrimaryIndexInstantSearchOperationCallbackFactory(jobId, getDatasetId(), 
primaryKeyFields,
-                            
storageComponentProvider.getTransactionSubsystemProvider(), 
ResourceType.LSM_BTREE);
+                            : new 
PrimaryIndexInstantSearchOperationCallbackFactory(jobId, getDatasetId(), 
primaryKeyFields,
+                                    
storageComponentProvider.getTransactionSubsystemProvider(), 
ResourceType.LSM_BTREE);
         }
         return new SecondaryIndexSearchOperationCallbackFactory();
     }
@@ -522,17 +522,17 @@
                     ? new UpsertOperationCallbackFactory(jobId, 
getDatasetId(), primaryKeyFields,
                             
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(),
                             hasMetaPart())
-                    : op == IndexOperation.DELETE || op == 
IndexOperation.INSERT
+                            : op == IndexOperation.DELETE || op == 
IndexOperation.INSERT
                             ? new 
PrimaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
                                     primaryKeyFields, 
componentProvider.getTransactionSubsystemProvider(), op,
                                     index.resourceType(), hasMetaPart())
-                            : NoOpOperationCallbackFactory.INSTANCE;
+                                    : NoOpOperationCallbackFactory.INSTANCE;
         } else {
             return op == IndexOperation.DELETE || op == IndexOperation.INSERT 
|| op == IndexOperation.UPSERT
                     ? new 
SecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(), 
primaryKeyFields,
                             
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(),
                             hasMetaPart())
-                    : NoOpOperationCallbackFactory.INSTANCE;
+                            : NoOpOperationCallbackFactory.INSTANCE;
         }
     }
 
@@ -577,4 +577,27 @@
                 metadataProvider.isTemporaryDatasetWriteJob(), 
metadataProvider.isWriteTransaction(), upsertVarIdx,
                 datasetPartitions, isSink);
     }
+
+    /**
+     * Get the index dataflow helper factory for the dataset's primary index
+     *
+     * @param mdProvider
+     *            an instance of metadata provider that is used to fetch 
metadata information
+     * @throws AlgebricksException
+     */
+    public IIndexDataflowHelperFactory 
getIndexDataflowHelperFactory(MetadataProvider mdProvider)
+            throws AlgebricksException {
+        if (getDatasetType() != DatasetType.INTERNAL) {
+            throw new AlgebricksException("Only Internal datasets have 
dataflow helper factory");
+        }
+        Index index = 
MetadataManager.INSTANCE.getIndex(mdProvider.getMetadataTxnContext(), 
getDataverseName(),
+                getDatasetName(), getDatasetName());
+        ARecordType recordType = (ARecordType) 
mdProvider.findType(getItemTypeDataverseName(), getItemTypeName());
+        ARecordType metaType = (ARecordType) 
mdProvider.findType(getMetaItemTypeDataverseName(), getMetaItemTypeName());
+        Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
+                DatasetUtil.getMergePolicyFactory(this, 
mdProvider.getMetadataTxnContext());
+        return getIndexDataflowHelperFactory(mdProvider, index, recordType, 
metaType, compactionInfo.first,
+                compactionInfo.second);
+    }
+
 }
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
index 2fae304..190a3b2 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/nonvisitor/ARecordPointable.java
@@ -71,6 +71,7 @@
 public class ARecordPointable extends AbstractPointable {
 
     private final UTF8StringWriter utf8Writer = new UTF8StringWriter();
+    public static final ARecordPointableFactory FACTORY = new 
ARecordPointableFactory();
 
     public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
         private static final long serialVersionUID = 1L;
@@ -86,11 +87,15 @@
         }
     };
 
-    public static final IPointableFactory FACTORY = new IPointableFactory() {
+    public static class ARecordPointableFactory implements IPointableFactory {
+
         private static final long serialVersionUID = 1L;
 
+        private ARecordPointableFactory() {
+        }
+
         @Override
-        public IPointable createPointable() {
+        public ARecordPointable createPointable() {
             return new ARecordPointable();
         }
 
@@ -98,7 +103,8 @@
         public ITypeTraits getTypeTraits() {
             return TYPE_TRAITS;
         }
-    };
+
+    }
 
     public static final IObjectFactory<IPointable, ATypeTag> ALLOCATOR = new 
IObjectFactory<IPointable, ATypeTag>() {
         @Override
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index d38c5b7..510664b 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -32,7 +32,7 @@
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.utils.TransactionUtil;
 import 
org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
-import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -46,7 +46,7 @@
 
 public class CommitRuntime extends 
AbstractOneInputOneOutputOneFramePushRuntime {
 
-    private final static long SEED = 0L;
+    protected static final long SEED = 0L;
 
     protected final ITransactionManager transactionManager;
     protected final ILogManager logMgr;
@@ -85,8 +85,7 @@
         try {
             transactionContext = 
transactionManager.getTransactionContext(jobId, false);
             transactionContext.setWriteTxn(isWriteTransaction);
-            ILogMarkerCallback callback =
-                    
TaskUtil.<ILogMarkerCallback>get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
+            ILogMarkerCallback callback = 
TaskUtil.get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
             logRecord = new LogRecord(callback);
             if (isSink) {
                 return;
@@ -126,7 +125,7 @@
                 }
             }
         }
-        VSizeFrame message = 
TaskUtil.<VSizeFrame>get(HyracksConstants.KEY_MESSAGE, ctx);
+        IFrame message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx);
         if (message != null
                 && MessagingFrameTupleAppender.getMessageType(message) == 
MessagingFrameTupleAppender.MARKER_MESSAGE) {
             try {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 536e657..cfe2a25 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -29,14 +29,14 @@
 
     private static final long serialVersionUID = 1L;
 
-    private final JobId jobId;
-    private final int datasetId;
-    private final int[] primaryKeyFields;
-    private final boolean isTemporaryDatasetWriteJob;
-    private final boolean isWriteTransaction;
-    private final int upsertVarIdx;
-    private int[] datasetPartitions;
-    private final boolean isSink;
+    protected final JobId jobId;
+    protected final int datasetId;
+    protected final int[] primaryKeyFields;
+    protected final boolean isTemporaryDatasetWriteJob;
+    protected final boolean isWriteTransaction;
+    protected final int upsertVarIdx;
+    protected int[] datasetPartitions;
+    protected final boolean isSink;
 
     public CommitRuntimeFactory(JobId jobId, int datasetId, int[] 
primaryKeyFields, boolean isTemporaryDatasetWriteJob,
             boolean isWriteTransaction, int upsertVarIdx, int[] 
datasetPartitions, boolean isSink) {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
index 9b2fe36..b555471 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
@@ -27,7 +27,7 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 
 public class UpsertCommitRuntime extends CommitRuntime {
-    private final int upsertIdx;
+    protected final int upsertIdx;
 
     public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int 
datasetId, int[] primaryKeyFields,
             boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, 
int resourcePartition, int upsertIdx,
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
index 06538af..80dd19b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/VSizeFrame.java
@@ -43,6 +43,7 @@
         buffer = ctx.allocateFrame(frameSize);
     }
 
+    @Override
     public ByteBuffer getBuffer() {
         return buffer;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
index 346f934..19d2c6d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java
@@ -459,4 +459,8 @@
     public ThreadDumpRun removeThreadDumpRun(String requestKey) {
         return threadDumpRunMap.remove(requestKey);
     }
+
+    public ICCApplicationEntryPoint getApplication() {
+        return aep;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
index 05417a8..ec9d083 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
@@ -25,6 +25,12 @@
 
     protected int length;
 
+    public byte[] copy() {
+        byte[] copy = new byte[length];
+        System.arraycopy(bytes, start, copy, 0, length);
+        return copy;
+    }
+
     @Override
     public void set(byte[] bytes, int start, int length) {
         this.bytes = bytes;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
index 74ced4f..2e8071c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/IPointable.java
@@ -18,8 +18,27 @@
  */
 package org.apache.hyracks.data.std.api;
 
+/**
+ * Point to range over byte array
+ */
 public interface IPointable extends IValueReference {
-    public void set(byte[] bytes, int start, int length);
+    /**
+     * Point to the range from position = start with length = length over the 
byte array bytes
+     *
+     * @param bytes
+     *            the byte array
+     * @param start
+     *            the start offset
+     * @param length
+     *            the length of the range
+     */
+    void set(byte[] bytes, int start, int length);
 
-    public void set(IValueReference pointer);
+    /**
+     * Point to the same range pointed to by the passed pointer
+     *
+     * @param pointer
+     *            the pointer to the targetted range
+     */
+    void set(IValueReference pointer);
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
index ee00163..51c155e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/VoidPointable.java
@@ -20,10 +20,10 @@
 
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.data.std.api.AbstractPointable;
-import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.api.IPointableFactory;
 
 public final class VoidPointable extends AbstractPointable {
+    public static final VoidPointableFactory FACTORY = new 
VoidPointableFactory();
     public static final ITypeTraits TYPE_TRAITS = new ITypeTraits() {
         private static final long serialVersionUID = 1L;
 
@@ -38,11 +38,14 @@
         }
     };
 
-    public static final IPointableFactory FACTORY = new IPointableFactory() {
+    public static class VoidPointableFactory implements IPointableFactory {
         private static final long serialVersionUID = 1L;
 
+        private VoidPointableFactory() {
+        }
+
         @Override
-        public IPointable createPointable() {
+        public VoidPointable createPointable() {
             return new VoidPointable();
         }
 
@@ -50,5 +53,5 @@
         public ITypeTraits getTypeTraits() {
             return TYPE_TRAITS;
         }
-    };
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index 57f8072..704df61 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -59,7 +59,7 @@
 
     protected boolean hasEnoughSpace(int fieldCount, int tupleLength) {
         return tupleDataEndOffset + FrameHelper.calcRequiredSpace(fieldCount, 
tupleLength)
-                + tupleCount * FrameConstants.SIZE_LEN <= 
FrameHelper.getTupleCountOffset(frame.getFrameSize());
+        + tupleCount * FrameConstants.SIZE_LEN <= 
FrameHelper.getTupleCountOffset(frame.getFrameSize());
     }
 
     protected void reset(ByteBuffer buffer, boolean clear) {
@@ -108,4 +108,11 @@
         return false;
     }
 
+    @Override
+    public void flush(IFrameWriter writer) throws HyracksDataException {
+        if (tupleCount > 0) {
+            write(writer, true);
+        }
+        writer.flush();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index 8f005d8..9d98bb2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -22,8 +22,8 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.comm.FrameHelper;
+import org.apache.hyracks.api.comm.IFrame;
 import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.HyracksConstants;
@@ -37,23 +37,23 @@
  */
 public class MessagingFrameTupleAppender extends FrameTupleAppender {
 
-    private final IHyracksTaskContext ctx;
-    private static final int NULL_MESSAGE_SIZE = 1;
+    protected final IHyracksTaskContext ctx;
+    protected static final int NULL_MESSAGE_SIZE = 1;
     public static final byte NULL_FEED_MESSAGE = 0x01;
     public static final byte ACK_REQ_FEED_MESSAGE = 0x02;
     public static final byte MARKER_MESSAGE = 0x03;
-    private boolean initialized = false;
-    private VSizeFrame message;
+    protected boolean initialized = false;
+    protected IFrame message;
 
     public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
         this.ctx = ctx;
     }
 
-    public static void printMessage(VSizeFrame message, PrintStream out) 
throws HyracksDataException {
+    public static void printMessage(IFrame message, PrintStream out) throws 
HyracksDataException {
         out.println(getMessageString(message));
     }
 
-    public static String getMessageString(VSizeFrame message) throws 
HyracksDataException {
+    public static String getMessageString(IFrame message) throws 
HyracksDataException {
         StringBuilder aString = new StringBuilder();
         aString.append("Message Type: ");
         switch (getMessageType(message)) {
@@ -76,7 +76,7 @@
         return aString.toString();
     }
 
-    public static byte getMessageType(VSizeFrame message) throws 
HyracksDataException {
+    public static byte getMessageType(IFrame message) throws 
HyracksDataException {
         switch (message.getBuffer().array()[0]) {
             case NULL_FEED_MESSAGE:
                 return NULL_FEED_MESSAGE;
@@ -112,8 +112,7 @@
     @Override
     public void write(IFrameWriter outWriter, boolean clearFrame) throws 
HyracksDataException {
         if (!initialized) {
-            message = TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, 
ctx);
-            initialized = true;
+            init();
         }
         // If message fits, we append it, otherwise, we append a null message, 
then send a message only
         // frame with the message
@@ -125,7 +124,7 @@
         } else {
             ByteBuffer buffer = message.getBuffer();
             int messageSize = buffer.limit() - buffer.position();
-            if (hasEnoughSpace(1, messageSize)) {
+            if (hasEnoughSpace(0, messageSize)) {
                 appendMessage(buffer);
                 forward(outWriter);
             } else {
@@ -133,7 +132,7 @@
                     appendNullMessage();
                     forward(outWriter);
                 }
-                if (!hasEnoughSpace(1, messageSize)) {
+                if (!hasEnoughSpace(0, messageSize)) {
                     
frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, 
frame.getMinSize()));
                     reset(frame.getBuffer(), true);
                 }
@@ -143,14 +142,19 @@
         }
     }
 
-    private void forward(IFrameWriter outWriter) throws HyracksDataException {
+    protected void init() {
+        message = TaskUtil.<IFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
+        initialized = true;
+    }
+
+    protected void forward(IFrameWriter outWriter) throws HyracksDataException 
{
         getBuffer().clear();
         outWriter.nextFrame(getBuffer());
         frame.reset();
         reset(getBuffer(), true);
     }
 
-    private void appendMessage(ByteBuffer message) {
+    protected void appendMessage(ByteBuffer message) {
         int messageLength = message.limit() - message.position();
         System.arraycopy(message.array(), message.position(), array, 
tupleDataEndOffset, messageLength);
         tupleDataEndOffset += messageLength;
@@ -160,7 +164,7 @@
         IntSerDeUtils.putInt(getBuffer().array(), 
FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount);
     }
 
-    private void appendNullMessage() {
+    protected void appendNullMessage() {
         array[tupleDataEndOffset] = NULL_FEED_MESSAGE;
         tupleDataEndOffset++;
         IntSerDeUtils.putInt(getBuffer().array(),
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
index 6d87d89..2eb1f94 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionDataWriter.java
@@ -33,14 +33,14 @@
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 
 public class PartitionDataWriter implements IFrameWriter {
-    private final int consumerPartitionCount;
-    private final IFrameWriter[] pWriters;
-    private final boolean[] isOpen;
-    private final FrameTupleAppender[] appenders;
-    private final FrameTupleAccessor tupleAccessor;
-    private final ITuplePartitionComputer tpc;
-    private final IHyracksTaskContext ctx;
-    private boolean[] allocatedFrames;
+    protected final int consumerPartitionCount;
+    protected final IFrameWriter[] pWriters;
+    protected final boolean[] isOpen;
+    protected final FrameTupleAppender[] appenders;
+    protected final FrameTupleAccessor tupleAccessor;
+    protected final ITuplePartitionComputer tpc;
+    protected final IHyracksTaskContext ctx;
+    protected boolean[] allocatedFrames;
 
     public PartitionDataWriter(IHyracksTaskContext ctx, int 
consumerPartitionCount, IPartitionWriterFactory pwFactory,
             RecordDescriptor recordDescriptor, ITuplePartitionComputer tpc) 
throws HyracksDataException {
@@ -49,6 +49,13 @@
         isOpen = new boolean[consumerPartitionCount];
         allocatedFrames = new boolean[consumerPartitionCount];
         appenders = new FrameTupleAppender[consumerPartitionCount];
+        tupleAccessor = new FrameTupleAccessor(recordDescriptor);
+        this.tpc = tpc;
+        this.ctx = ctx;
+        initializeAppenders(pwFactory);
+    }
+
+    protected void initializeAppenders(IPartitionWriterFactory pwFactory) 
throws HyracksDataException {
         for (int i = 0; i < consumerPartitionCount; ++i) {
             try {
                 pWriters[i] = pwFactory.createFrameWriter(i);
@@ -57,9 +64,6 @@
                 throw new HyracksDataException(e);
             }
         }
-        tupleAccessor = new FrameTupleAccessor(recordDescriptor);
-        this.tpc = tpc;
-        this.ctx = ctx;
     }
 
     protected FrameTupleAppender createTupleAppender(IHyracksTaskContext ctx) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/ArrayValueReference.java
similarity index 84%
rename from 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
rename to 
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/ArrayValueReference.java
index 627994c..fc0ce9c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/ArrayValueReference.java
@@ -20,14 +20,10 @@
 
 import org.apache.hyracks.data.std.api.IValueReference;
 
-public class MutableArrayValueReference implements IValueReference {
+public class ArrayValueReference implements IValueReference {
     private byte[] array;
 
-    public MutableArrayValueReference() {
-        //mutable array. user doesn't need to specify the array in advance
-    }
-
-    public MutableArrayValueReference(byte[] array) {
+    public ArrayValueReference(byte[] array) {
         this.array = array;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/test/java/org/apache/hyracks/storage/am/common/frames/LIFOMetadataFrameTest.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/test/java/org/apache/hyracks/storage/am/common/frames/LIFOMetadataFrameTest.java
index fbb930d..3454ecb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/test/java/org/apache/hyracks/storage/am/common/frames/LIFOMetadataFrameTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/test/java/org/apache/hyracks/storage/am/common/frames/LIFOMetadataFrameTest.java
@@ -22,7 +22,7 @@
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.LongPointable;
-import 
org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.common.freepage.ArrayValueReference;
 import org.apache.hyracks.storage.common.buffercache.VirtualPage;
 import org.junit.Assert;
 import org.junit.Test;
@@ -33,14 +33,14 @@
     public void test() throws HyracksDataException {
         LIFOMetaDataFrame frame = new LIFOMetaDataFrame();
         VirtualPage page = new VirtualPage(ByteBuffer.allocate(512), 512);
-        MutableArrayValueReference testKey = new 
MutableArrayValueReference("TestLSNKey".getBytes());
+        ArrayValueReference testKey = new 
ArrayValueReference("TestLSNKey".getBytes());
         frame.setPage(page);
         frame.init();
         LongPointable longPointable = (LongPointable) 
LongPointable.FACTORY.createPointable();
         frame.get(testKey, longPointable);
         Assert.assertNull(longPointable.getByteArray());
         byte[] longBytes = new byte[Long.BYTES];
-        MutableArrayValueReference value = new 
MutableArrayValueReference(longBytes);
+        ArrayValueReference value = new ArrayValueReference(longBytes);
         int space = frame.getSpace() - (value.getLength() + Integer.BYTES * 2
                 + testKey.getLength());
         for (long l = 1L; l < 52L; l++) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
index 7f8e990..100084d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java
@@ -25,7 +25,7 @@
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
-import 
org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.common.freepage.ArrayValueReference;
 import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter;
 import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
@@ -34,7 +34,7 @@
 
 public class LSMComponentFilterManager implements ILSMComponentFilterManager {
 
-    public static final MutableArrayValueReference FILTER_KEY = new 
MutableArrayValueReference("Filter".getBytes());
+    public static final ArrayValueReference FILTER_KEY = new 
ArrayValueReference("Filter".getBytes());
     private final ILSMComponentFilterFrameFactory filterFrameFactory;
 
     public LSMComponentFilterManager(ILSMComponentFilterFrameFactory 
filterFrameFactory) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
index b55e8ad..04d50df 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
@@ -24,7 +24,7 @@
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.LongPointable;
-import 
org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.common.freepage.ArrayValueReference;
 import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -32,8 +32,8 @@
 
 public class ComponentMetadataUtil {
 
-    public static final MutableArrayValueReference MARKER_LSN_KEY =
-            new MutableArrayValueReference("Marker".getBytes());
+    public static final ArrayValueReference MARKER_LSN_KEY =
+            new ArrayValueReference("Marker".getBytes());
     public static final long NOT_FOUND = -1L;
 
     private ComponentMetadataUtil() {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1523
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie97b2133ebecb7380cf0ba336e60ed714d06f8ee
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>

Reply via email to