abdullah alamoudi has submitted this change and it was merged.

Change subject: Fix creation of callback factories
......................................................................


Fix creation of callback factories

Change-Id: Idbeacf5af01b77c5f81b59aa6acec9b13762d629
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1613
Tested-by: Jenkins <[email protected]>
Reviewed-by: Michael Blow <[email protected]>
---
M 
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
M 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
8 files changed, 93 insertions(+), 115 deletions(-)

Approvals:
  Michael Blow: Looks good to me, approved
  Jenkins: Verified

Objections:
  Jenkins: Violations found



diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 12f28c5..ac3caf3 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -18,6 +18,9 @@
  */
 package org.apache.asterix.active;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.asterix.active.message.ActivePartitionMessage;
 import org.apache.asterix.common.api.IAppRuntimeContext;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -28,6 +31,7 @@
 public abstract class ActiveSourceOperatorNodePushable extends 
AbstractUnaryOutputSourceOperatorNodePushable
         implements IActiveRuntime {
 
+    private final Logger LOGGER = 
Logger.getLogger(ActiveSourceOperatorNodePushable.class.getName());
     protected final IHyracksTaskContext ctx;
     protected final ActiveManager activeManager;
     /** A unique identifier for the runtime **/
@@ -79,6 +83,7 @@
 
     @Override
     public final void initialize() throws HyracksDataException {
+        LOGGER.log(Level.INFO, "initialize() called on 
ActiveSourceOperatorNodePushable");
         activeManager.registerRuntime(this);
         try {
             // notify cc that runtime has been registered
@@ -86,15 +91,18 @@
                     ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null);
             start();
         } catch (InterruptedException e) {
+            LOGGER.log(Level.INFO, "initialize() interrupted on 
ActiveSourceOperatorNodePushable", e);
             Thread.currentThread().interrupt();
             throw new HyracksDataException(e);
         } catch (Exception e) {
+            LOGGER.log(Level.INFO, "initialize() failed on 
ActiveSourceOperatorNodePushable", e);
             throw new HyracksDataException(e);
         } finally {
             synchronized (this) {
                 done = true;
                 notifyAll();
             }
+            LOGGER.log(Level.INFO, "initialize() returning on 
ActiveSourceOperatorNodePushable");
         }
     }
 
@@ -105,10 +113,12 @@
             ctx.sendApplicationMessageToCC(new 
ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
                     ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED), null);
         } catch (Exception e) {
+            LOGGER.log(Level.INFO, "deinitialize() failed on 
ActiveSourceOperatorNodePushable", e);
             throw new HyracksDataException(e);
+        } finally {
+            LOGGER.log(Level.INFO, "deinitialize() returning on 
ActiveSourceOperatorNodePushable");
         }
     }
-
 
     @Override
     public final IFrameWriter getInputFrameWriter(int index) {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
index 2ff0617..4b28d42 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
@@ -30,6 +30,7 @@
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -49,13 +50,13 @@
             IBinaryComparatorFactory[] tokenComparatorFactories, ITypeTraits[] 
invListsTypeTraits,
             IBinaryComparatorFactory[] invListComparatorFactories, 
IBinaryTokenizerFactory tokenizerFactory,
             int[] fieldPermutation, IndexOperation op, 
IIndexDataflowHelperFactory dataflowHelperFactory,
-            ITupleFilterFactory tupleFilterFactory,
-            IModificationOperationCallbackFactory 
modificationOpCallbackFactory, String indexName,
+            ITupleFilterFactory tupleFilterFactory, 
IModificationOperationCallbackFactory modificationOpCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory, String 
indexName,
             IPageManagerFactory pageManagerFactory) {
         super(spec, recDesc, storageManager, fileSplitProvider, 
lifecycleManagerProvider, tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, 
invListComparatorFactories, tokenizerFactory,
                 fieldPermutation, op, dataflowHelperFactory, 
tupleFilterFactory, modificationOpCallbackFactory,
-                pageManagerFactory);
+                searchCallbackFactory, pageManagerFactory);
         this.indexName = indexName;
     }
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 39ea54d..f8f4a0e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -31,14 +31,12 @@
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
-import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.IApplicationContextInfo;
 import 
org.apache.asterix.common.dataflow.LSMInvertedIndexInsertDeleteOperatorDescriptor;
 import 
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
@@ -89,15 +87,6 @@
 import org.apache.asterix.runtime.utils.AppContextInfo;
 import org.apache.asterix.runtime.utils.ClusterStateManager;
 import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
-import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
-import 
org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
-import 
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -146,7 +135,6 @@
 import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
@@ -157,7 +145,6 @@
 public class MetadataProvider implements IMetadataProvider<DataSourceId, 
String> {
 
     private final IStorageComponentProvider storaegComponentProvider;
-    private final ITransactionSubsystemProvider txnSubsystemProvider;
     private final IMetadataPageManagerFactory metadataPageManagerFactory;
     private final IPrimitiveValueProviderFactory primitiveValueProviderFactory;
     private final StorageProperties storageProperties;
@@ -182,7 +169,6 @@
         this.storaegComponentProvider = componentProvider;
         storageProperties = AppContextInfo.INSTANCE.getStorageProperties();
         libraryManager = AppContextInfo.INSTANCE.getLibraryManager();
-        txnSubsystemProvider = 
componentProvider.getTransactionSubsystemProvider();
         metadataPageManagerFactory = 
componentProvider.getMetadataPageManagerFactory();
         primitiveValueProviderFactory = 
componentProvider.getPrimitiveValueProviderFactory();
     }
@@ -457,7 +443,6 @@
         boolean isSecondary = true;
         int numSecondaryKeys = 0;
         try {
-            boolean temp = dataset.getDatasetDetails().isTemp();
             Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), dataset.getDatasetName());
             if (primaryIndex != null && (dataset.getDatasetType() != 
DatasetType.EXTERNAL)) {
@@ -521,27 +506,13 @@
             IApplicationContextInfo appContext = (IApplicationContextInfo) 
context.getAppContext();
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
             spPc = getSplitProviderAndConstraints(dataset, 
theIndex.getIndexName());
-
-            ISearchOperationCallbackFactory searchCallbackFactory;
-            if (isSecondary) {
-                searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
-                        : new SecondaryIndexSearchOperationCallbackFactory();
-            } else {
-                int datasetId = dataset.getDatasetId();
-                int[] primaryKeyFields = new int[numPrimaryKeys];
-                for (int i = 0; i < numPrimaryKeys; i++) {
-                    primaryKeyFields[i] = i;
-                }
-
-                /**
-                 * Due to the read-committed isolation level,
-                 * we may acquire very short duration lock(i.e., instant lock) 
for readers.
-                 */
-                searchCallbackFactory = temp ? 
NoOpOperationCallbackFactory.INSTANCE
-                        : new 
PrimaryIndexInstantSearchOperationCallbackFactory(
-                                ((JobEventListenerFactory) 
jobSpec.getJobletEventListenerFactory()).getJobId(),
-                                datasetId, primaryKeyFields, 
txnSubsystemProvider, ResourceType.LSM_BTREE);
+            int[] primaryKeyFields = new int[numPrimaryKeys];
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                primaryKeyFields[i] = i;
             }
+
+            ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, theIndex, jobId, 
IndexOperation.SEARCH, primaryKeyFields);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             RuntimeComponentsProvider rtcProvider = 
RuntimeComponentsProvider.RUNTIME_PROVIDER;
@@ -577,8 +548,6 @@
         try {
             ARecordType recType = (ARecordType) 
findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
             int numPrimaryKeys = 
DatasetUtil.getPartitioningKeys(dataset).size();
-
-            boolean temp = dataset.getDatasetDetails().isTemp();
             Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), indexName);
             if (secondaryIndex == null) {
@@ -646,8 +615,13 @@
             }
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
-            ISearchOperationCallbackFactory searchCallbackFactory =
-                    temp ? NoOpOperationCallbackFactory.INSTANCE : new 
SecondaryIndexSearchOperationCallbackFactory();
+            int[] primaryKeyFields = new int[numPrimaryKeys];
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                primaryKeyFields[i] = i;
+            }
+
+            ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, 
IndexOperation.SEARCH, primaryKeyFields);
             RTreeSearchOperatorDescriptor rtreeSearchOp;
             IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(this,
                     secondaryIndex, recType, metaType, compactionInfo.first, 
compactionInfo.second);
@@ -998,7 +972,6 @@
                 throw new AlgebricksException(" Unabel to create merge policy 
factory for external dataset", e);
             }
 
-            boolean temp = datasetDetails.isTemp();
             String fileIndexName = 
BTreeDataflowHelperFactoryProvider.externalFileIndexName(dataset);
             Index fileIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                     dataset.getDatasetName(), fileIndexName);
@@ -1011,8 +984,8 @@
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
             spPc = 
metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
                     dataset.getDatasetName(), fileIndexName, false);
-            ISearchOperationCallbackFactory searchOpCallbackFactory =
-                    temp ? NoOpOperationCallbackFactory.INSTANCE : new 
SecondaryIndexSearchOperationCallbackFactory();
+            ISearchOperationCallbackFactory searchOpCallbackFactory = dataset
+                    .getSearchCallbackFactory(storaegComponentProvider, 
fileIndex, jobId, IndexOperation.SEARCH, null);
             // Create the operator
             ExternalLookupOperatorDescriptor op = new 
ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
                     outRecDesc, indexDataflowHelperFactory, retainInput, 
appContext.getIndexLifecycleManagerProvider(),
@@ -1085,20 +1058,16 @@
                     getSplitProviderAndConstraints(dataset);
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
             int[] primaryKeyFields = new int[numKeys];
             for (i = 0; i < numKeys; i++) {
                 primaryKeyFields[i] = i;
             }
 
-            IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
-                    ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            primaryKeyFields, txnSubsystemProvider, 
Operation.UPSERT, ResourceType.LSM_BTREE)
-                    : new UpsertOperationCallbackFactory(jobId, datasetId, 
primaryKeyFields, txnSubsystemProvider,
-                            Operation.UPSERT, ResourceType.LSM_BTREE);
+            IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, primaryIndex, jobId, 
IndexOperation.UPSERT, primaryKeyFields);
 
-            LockThenSearchOperationCallbackFactory searchCallbackFactory = new 
LockThenSearchOperationCallbackFactory(
-                    jobId, datasetId, primaryKeyFields, txnSubsystemProvider, 
ResourceType.LSM_BTREE);
+            ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, primaryIndex, jobId, 
IndexOperation.UPSERT, primaryKeyFields);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1281,18 +1250,14 @@
                     getSplitProviderAndConstraints(dataset);
 
             // prepare callback
-            int datasetId = dataset.getDatasetId();
             int[] primaryKeyFields = new int[numKeys];
             for (i = 0; i < numKeys; i++) {
                 primaryKeyFields[i] = i;
             }
-            IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
-                    ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(
-                            ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
-                            primaryKeyFields, txnSubsystemProvider, 
Operation.get(indexOp), ResourceType.LSM_BTREE)
-                    : new PrimaryIndexModificationOperationCallbackFactory(
-                            ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
-                            primaryKeyFields, txnSubsystemProvider, 
Operation.get(indexOp), ResourceType.LSM_BTREE);
+            IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, primaryIndex, jobId, indexOp, 
primaryKeyFields);
+            ISearchOperationCallbackFactory searchCallbackFactory = dataset
+                    .getSearchCallbackFactory(storaegComponentProvider, 
primaryIndex, jobId, indexOp, primaryKeyFields);
 
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1310,7 +1275,7 @@
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
                         comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, indexOp, idfh, null, true,
-                        indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
+                        indexName, null, modificationCallbackFactory, 
searchCallbackFactory,
                         metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
@@ -1483,15 +1448,10 @@
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
-                    ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_BTREE)
-                    : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_BTREE);
-
+            IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
+            ISearchOperationCallbackFactory searchOpCallbackFactory = 
dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory idfh = 
dataset.getIndexDataflowHelperFactory(this, secondaryIndex, itemType,
@@ -1508,13 +1468,13 @@
                 op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
                         comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, idfh, filterFactory, false,
-                        indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
-                        prevFieldPermutation, metadataPageManagerFactory, 
dataset.getFrameOpCallbackFactory());
+                        indexName, null, modificationCallbackFactory, 
searchOpCallbackFactory, prevFieldPermutation,
+                        metadataPageManagerFactory, 
dataset.getFrameOpCallbackFactory());
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
                         comparatorFactories, bloomFilterKeyFields, 
fieldPermutation, indexOp, idfh, filterFactory,
-                        false, indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
+                        false, indexName, null, modificationCallbackFactory, 
searchOpCallbackFactory,
                         metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
@@ -1641,15 +1601,10 @@
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
-                    ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_RTREE)
-                    : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_RTREE);
-
+            IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
+            ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory indexDataflowHelperFactory = 
dataset.getIndexDataflowHelperFactory(this,
@@ -1666,13 +1621,13 @@
                 op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc, 
appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
                         comparatorFactories, null, fieldPermutation, 
indexDataflowHelperFactory, filterFactory, false,
-                        indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
-                        prevFieldPermutation, metadataPageManagerFactory, 
dataset.getFrameOpCallbackFactory());
+                        indexName, null, modificationCallbackFactory, 
searchCallbackFactory, prevFieldPermutation,
+                        metadataPageManagerFactory, 
dataset.getFrameOpCallbackFactory());
             } else {
                 op = new LSMTreeInsertDeleteOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
                         appContext.getIndexLifecycleManagerProvider(), 
splitsAndConstraint.first, typeTraits,
                         comparatorFactories, null, fieldPermutation, indexOp, 
indexDataflowHelperFactory, filterFactory,
-                        false, indexName, null, modificationCallbackFactory, 
NoOpOperationCallbackFactory.INSTANCE,
+                        false, indexName, null, modificationCallbackFactory, 
searchCallbackFactory,
                         metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
@@ -1851,14 +1806,10 @@
 
             // prepare callback
             JobId jobId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getJobId();
-            int datasetId = dataset.getDatasetId();
-            IModificationOperationCallbackFactory modificationCallbackFactory 
= temp
-                    ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_INVERTED_INDEX)
-                    : new 
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
-                            modificationCallbackPrimaryKeyFields, 
txnSubsystemProvider, Operation.get(indexOp),
-                            ResourceType.LSM_INVERTED_INDEX);
+            IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
+            ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
+                    storaegComponentProvider, secondaryIndex, jobId, indexOp, 
modificationCallbackPrimaryKeyFields);
             Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
                     DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
             IIndexDataflowHelperFactory indexDataFlowFactory = 
dataset.getIndexDataflowHelperFactory(this,
@@ -1875,14 +1826,16 @@
                 op = new LSMInvertedIndexUpsertOperatorDescriptor(spec, 
recordDesc, appContext.getStorageManager(),
                         splitsAndConstraint.first, 
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits,
                         tokenComparatorFactories, invListsTypeTraits, 
invListComparatorFactories, tokenizerFactory,
-                        fieldPermutation, indexDataFlowFactory, filterFactory, 
modificationCallbackFactory, indexName,
+                        fieldPermutation, indexDataFlowFactory, filterFactory, 
modificationCallbackFactory,
+                        searchCallbackFactory, indexName,
                         prevFieldPermutation, metadataPageManagerFactory);
             } else {
                 op = new LSMInvertedIndexInsertDeleteOperatorDescriptor(spec, 
recordDesc,
                         appContext.getStorageManager(), 
splitsAndConstraint.first,
                         appContext.getIndexLifecycleManagerProvider(), 
tokenTypeTraits, tokenComparatorFactories,
                         invListsTypeTraits, invListComparatorFactories, 
tokenizerFactory, fieldPermutation, indexOp,
-                        indexDataFlowFactory, filterFactory, 
modificationCallbackFactory, indexName,
+                        indexDataFlowFactory, filterFactory, 
modificationCallbackFactory, searchCallbackFactory,
+                        indexName,
                         metadataPageManagerFactory);
             }
             return new Pair<>(op, splitsAndConstraint.second);
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 82fe036..e0607a6 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -68,6 +68,7 @@
 import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
+import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
 import 
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
 import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
@@ -519,9 +520,13 @@
             int[] primaryKeyFields) throws AlgebricksException {
         if (getDatasetDetails().isTemp()) {
             return op == IndexOperation.DELETE || op == IndexOperation.INSERT 
|| op == IndexOperation.UPSERT
-                    ? new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, 
getDatasetId(),
-                            primaryKeyFields, 
componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
-                            index.resourceType())
+                    ? index.isPrimaryIndex()
+                            ? new 
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+                                    primaryKeyFields, 
componentProvider.getTransactionSubsystemProvider(),
+                                    Operation.get(op), index.resourceType())
+                            : new 
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, 
getDatasetId(),
+                                    primaryKeyFields, 
componentProvider.getTransactionSubsystemProvider(),
+                                    Operation.get(op), index.resourceType())
                     : NoOpOperationCallbackFactory.INSTANCE;
         } else if (index.isPrimaryIndex()) {
             return op == IndexOperation.UPSERT
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
index f1547a8..02c1908 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
@@ -31,14 +31,14 @@
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
 import org.apache.hyracks.storage.common.IStorageManager;
 
-public class LSMInvertedIndexUpsertOperatorDescriptor
-        extends LSMInvertedIndexInsertDeleteOperatorDescriptor {
+public class LSMInvertedIndexUpsertOperatorDescriptor extends 
LSMInvertedIndexInsertDeleteOperatorDescriptor {
 
     private static final long serialVersionUID = 1L;
     private final int[] prevFieldPermutation;
@@ -50,18 +50,19 @@
             IBinaryComparatorFactory[] invListComparatorFactories, 
IBinaryTokenizerFactory tokenizerFactory,
             int[] fieldPermutation, IIndexDataflowHelperFactory 
dataflowHelperFactory,
             ITupleFilterFactory tupleFilterFactory, 
IModificationOperationCallbackFactory modificationOpCallbackFactory,
-            String indexName, int[] prevFieldPermutation, IPageManagerFactory 
pageManagerFactory) {
+            ISearchOperationCallbackFactory searchCallbackFactory, String 
indexName, int[] prevFieldPermutation,
+            IPageManagerFactory pageManagerFactory) {
         super(spec, recDesc, storageManager, fileSplitProvider, 
lifecycleManagerProvider, tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, 
invListComparatorFactories, tokenizerFactory,
                 fieldPermutation, IndexOperation.UPSERT, 
dataflowHelperFactory, tupleFilterFactory,
-                modificationOpCallbackFactory, indexName, pageManagerFactory);
+                modificationOpCallbackFactory, searchCallbackFactory, 
indexName, pageManagerFactory);
         this.prevFieldPermutation = prevFieldPermutation;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
-        return new LSMSecondaryUpsertOperatorNodePushable(this, ctx, 
partition, fieldPermutation,
-                recordDescProvider, prevFieldPermutation);
+        return new LSMSecondaryUpsertOperatorNodePushable(this, ctx, 
partition, fieldPermutation, recordDescProvider,
+                prevFieldPermutation);
     }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
index b37ecae..e6cfd2a 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
@@ -54,13 +54,13 @@
             IBinaryComparatorFactory[] comparatorFactories, int[] 
bloomFilterKeyFields, int[] fieldPermutation,
             IIndexDataflowHelperFactory dataflowHelperFactory, 
ITupleFilterFactory tupleFilterFactory,
             boolean isPrimary, String indexName, IMissingWriterFactory 
missingWriterFactory,
-            IModificationOperationCallbackFactory 
modificationOpCallbackProvider,
+            IModificationOperationCallbackFactory 
modificationOpCallbackFactory,
             ISearchOperationCallbackFactory searchOpCallbackProvider, int[] 
prevValuePermutation,
             IPageManagerFactory pageManagerFactory, 
IFrameOperationCallbackFactory frameOpCallbackFactory) {
         super(spec, recDesc, storageManager, lifecycleManagerProvider, 
fileSplitProvider, typeTraits,
                 comparatorFactories, bloomFilterKeyFields, fieldPermutation, 
IndexOperation.UPSERT,
                 dataflowHelperFactory, tupleFilterFactory, isPrimary, 
indexName, missingWriterFactory,
-                modificationOpCallbackProvider, searchOpCallbackProvider, 
pageManagerFactory);
+                modificationOpCallbackFactory, searchOpCallbackProvider, 
pageManagerFactory);
         this.prevValuePermutation = prevValuePermutation;
         this.frameOpCallbackFactory = frameOpCallbackFactory;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
index e9c0e5f..da3e986 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
@@ -18,20 +18,27 @@
  */
 package org.apache.hyracks.storage.am.lsm.common.impls;
 
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IndexException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
 
-public enum SynchronousScheduler implements ILSMIOOperationScheduler {
-    INSTANCE;
+public class SynchronousScheduler implements ILSMIOOperationScheduler {
+    private static final Logger LOGGER = 
Logger.getLogger(SynchronousScheduler.class.getName());
+    public static final SynchronousScheduler INSTANCE = new 
SynchronousScheduler();
+
+    private SynchronousScheduler() {
+    }
 
     @Override
     public void scheduleOperation(ILSMIOOperation operation) throws 
HyracksDataException {
         try {
             operation.call();
-        } catch (IndexException e) {
-            throw new HyracksDataException(e);
+        } catch (Exception e) {
+            LOGGER.log(Level.SEVERE, "IO Operation failed", e);
+            throw HyracksDataException.create(e);
         }
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
index 46201d5..a342370 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
@@ -31,9 +31,9 @@
 import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
 import 
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
@@ -55,11 +55,12 @@
             int[] fieldPermutation, IndexOperation op, 
IIndexDataflowHelperFactory dataflowHelperFactory,
             ITupleFilterFactory tupleFilterFactory,
             IModificationOperationCallbackFactory 
modificationOpCallbackFactory,
+            ISearchOperationCallbackFactory searchCallbackFactory,
             IPageManagerFactory pageManagerFactory) {
         super(spec, 1, 1, recDesc, storageManager, fileSplitProvider, 
lifecycleManagerProvider, tokenTypeTraits,
                 tokenComparatorFactories, invListsTypeTraits, 
invListComparatorFactories, tokenizerFactory,
                 dataflowHelperFactory, tupleFilterFactory, false, false,
-                null, NoOpLocalResourceFactoryProvider.INSTANCE, 
NoOpOperationCallbackFactory.INSTANCE,
+                null, NoOpLocalResourceFactoryProvider.INSTANCE, 
searchCallbackFactory,
                 modificationOpCallbackFactory,
                 pageManagerFactory);
         this.fieldPermutation = fieldPermutation;

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1613
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Idbeacf5af01b77c5f81b59aa6acec9b13762d629
Gerrit-PatchSet: 5
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Michael Blow <[email protected]>
Gerrit-Reviewer: abdullah alamoudi <[email protected]>

Reply via email to