abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1613
Change subject: Fix creation of callback factories
......................................................................
Fix creation of callback factories
Change-Id: Idbeacf5af01b77c5f81b59aa6acec9b13762d629
---
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
8 files changed, 89 insertions(+), 103 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/13/1613/1
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index 12f28c5..ac3caf3 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -18,6 +18,9 @@
*/
package org.apache.asterix.active;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.common.api.IAppRuntimeContext;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -28,6 +31,7 @@
public abstract class ActiveSourceOperatorNodePushable extends
AbstractUnaryOutputSourceOperatorNodePushable
implements IActiveRuntime {
+ private final Logger LOGGER =
Logger.getLogger(ActiveSourceOperatorNodePushable.class.getName());
protected final IHyracksTaskContext ctx;
protected final ActiveManager activeManager;
/** A unique identifier for the runtime **/
@@ -79,6 +83,7 @@
@Override
public final void initialize() throws HyracksDataException {
+ LOGGER.log(Level.INFO, "initialize() called on
ActiveSourceOperatorNodePushable");
activeManager.registerRuntime(this);
try {
// notify cc that runtime has been registered
@@ -86,15 +91,18 @@
ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED), null);
start();
} catch (InterruptedException e) {
+ LOGGER.log(Level.INFO, "initialize() interrupted on
ActiveSourceOperatorNodePushable", e);
Thread.currentThread().interrupt();
throw new HyracksDataException(e);
} catch (Exception e) {
+ LOGGER.log(Level.INFO, "initialize() failed on
ActiveSourceOperatorNodePushable", e);
throw new HyracksDataException(e);
} finally {
synchronized (this) {
done = true;
notifyAll();
}
+ LOGGER.log(Level.INFO, "initialize() returning on
ActiveSourceOperatorNodePushable");
}
}
@@ -105,10 +113,12 @@
ctx.sendApplicationMessageToCC(new
ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED), null);
} catch (Exception e) {
+ LOGGER.log(Level.INFO, "deinitialize() failed on
ActiveSourceOperatorNodePushable", e);
throw new HyracksDataException(e);
+ } finally {
+ LOGGER.log(Level.INFO, "deinitialize() returning on
ActiveSourceOperatorNodePushable");
}
}
-
@Override
public final IFrameWriter getInputFrameWriter(int index) {
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
index 2ff0617..e99cba5 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInvertedIndexInsertDeleteOperatorDescriptor.java
@@ -30,6 +30,7 @@
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -50,11 +51,13 @@
IBinaryComparatorFactory[] invListComparatorFactories,
IBinaryTokenizerFactory tokenizerFactory,
int[] fieldPermutation, IndexOperation op,
IIndexDataflowHelperFactory dataflowHelperFactory,
ITupleFilterFactory tupleFilterFactory,
- IModificationOperationCallbackFactory
modificationOpCallbackFactory, String indexName,
+ IModificationOperationCallbackFactory
modificationOpCallbackFactory,
+ ISearchOperationCallbackFactory searchCallbackFactory, String
indexName,
IPageManagerFactory pageManagerFactory) {
super(spec, recDesc, storageManager, fileSplitProvider,
lifecycleManagerProvider, tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits,
invListComparatorFactories, tokenizerFactory,
fieldPermutation, op, dataflowHelperFactory,
tupleFilterFactory, modificationOpCallbackFactory,
+ searchCallbackFactory,
pageManagerFactory);
this.indexName = indexName;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 39ea54d..175fe2b 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -31,14 +31,12 @@
import org.apache.asterix.common.config.GlobalConfig;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
-import org.apache.asterix.common.context.ITransactionSubsystemProvider;
import org.apache.asterix.common.dataflow.IApplicationContextInfo;
import
org.apache.asterix.common.dataflow.LSMInvertedIndexInsertDeleteOperatorDescriptor;
import
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.adapter.factory.LookupAdapterFactory;
@@ -89,15 +87,6 @@
import org.apache.asterix.runtime.utils.AppContextInfo;
import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.asterix.runtime.utils.RuntimeComponentsProvider;
-import
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
-import
org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallbackFactory;
-import
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
-import
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
-import
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
-import
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
-import
org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
-import
org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
-import
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -146,7 +135,6 @@
import
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
@@ -157,7 +145,6 @@
public class MetadataProvider implements IMetadataProvider<DataSourceId,
String> {
private final IStorageComponentProvider storaegComponentProvider;
- private final ITransactionSubsystemProvider txnSubsystemProvider;
private final IMetadataPageManagerFactory metadataPageManagerFactory;
private final IPrimitiveValueProviderFactory primitiveValueProviderFactory;
private final StorageProperties storageProperties;
@@ -182,7 +169,6 @@
this.storaegComponentProvider = componentProvider;
storageProperties = AppContextInfo.INSTANCE.getStorageProperties();
libraryManager = AppContextInfo.INSTANCE.getLibraryManager();
- txnSubsystemProvider =
componentProvider.getTransactionSubsystemProvider();
metadataPageManagerFactory =
componentProvider.getMetadataPageManagerFactory();
primitiveValueProviderFactory =
componentProvider.getPrimitiveValueProviderFactory();
}
@@ -521,27 +507,13 @@
IApplicationContextInfo appContext = (IApplicationContextInfo)
context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
spPc = getSplitProviderAndConstraints(dataset,
theIndex.getIndexName());
-
- ISearchOperationCallbackFactory searchCallbackFactory;
- if (isSecondary) {
- searchCallbackFactory = temp ?
NoOpOperationCallbackFactory.INSTANCE
- : new SecondaryIndexSearchOperationCallbackFactory();
- } else {
- int datasetId = dataset.getDatasetId();
- int[] primaryKeyFields = new int[numPrimaryKeys];
- for (int i = 0; i < numPrimaryKeys; i++) {
- primaryKeyFields[i] = i;
- }
-
- /**
- * Due to the read-committed isolation level,
- * we may acquire very short duration lock(i.e., instant lock)
for readers.
- */
- searchCallbackFactory = temp ?
NoOpOperationCallbackFactory.INSTANCE
- : new
PrimaryIndexInstantSearchOperationCallbackFactory(
- ((JobEventListenerFactory)
jobSpec.getJobletEventListenerFactory()).getJobId(),
- datasetId, primaryKeyFields,
txnSubsystemProvider, ResourceType.LSM_BTREE);
+ int[] primaryKeyFields = new int[numPrimaryKeys];
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ primaryKeyFields[i] = i;
}
+
+ ISearchOperationCallbackFactory searchCallbackFactory =
dataset.getSearchCallbackFactory(
+ storaegComponentProvider, theIndex, jobId,
IndexOperation.SEARCH, primaryKeyFields);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
RuntimeComponentsProvider rtcProvider =
RuntimeComponentsProvider.RUNTIME_PROVIDER;
@@ -646,8 +618,13 @@
}
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
- ISearchOperationCallbackFactory searchCallbackFactory =
- temp ? NoOpOperationCallbackFactory.INSTANCE : new
SecondaryIndexSearchOperationCallbackFactory();
+ int[] primaryKeyFields = new int[numPrimaryKeys];
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ primaryKeyFields[i] = i;
+ }
+
+ ISearchOperationCallbackFactory searchCallbackFactory =
dataset.getSearchCallbackFactory(
+ storaegComponentProvider, secondaryIndex, jobId,
IndexOperation.SEARCH, primaryKeyFields);
RTreeSearchOperatorDescriptor rtreeSearchOp;
IIndexDataflowHelperFactory indexDataflowHelperFactory =
dataset.getIndexDataflowHelperFactory(this,
secondaryIndex, recType, metaType, compactionInfo.first,
compactionInfo.second);
@@ -1011,8 +988,8 @@
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
spPc =
metadataProvider.splitProviderAndPartitionConstraintsForFilesIndex(dataset.getDataverseName(),
dataset.getDatasetName(), fileIndexName, false);
- ISearchOperationCallbackFactory searchOpCallbackFactory =
- temp ? NoOpOperationCallbackFactory.INSTANCE : new
SecondaryIndexSearchOperationCallbackFactory();
+ ISearchOperationCallbackFactory searchOpCallbackFactory = dataset
+ .getSearchCallbackFactory(storaegComponentProvider,
fileIndex, jobId, IndexOperation.SEARCH, null);
// Create the operator
ExternalLookupOperatorDescriptor op = new
ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
outRecDesc, indexDataflowHelperFactory, retainInput,
appContext.getIndexLifecycleManagerProvider(),
@@ -1091,14 +1068,11 @@
primaryKeyFields[i] = i;
}
- IModificationOperationCallbackFactory modificationCallbackFactory
= temp
- ? new
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- primaryKeyFields, txnSubsystemProvider,
Operation.UPSERT, ResourceType.LSM_BTREE)
- : new UpsertOperationCallbackFactory(jobId, datasetId,
primaryKeyFields, txnSubsystemProvider,
- Operation.UPSERT, ResourceType.LSM_BTREE);
+ IModificationOperationCallbackFactory modificationCallbackFactory
= dataset.getModificationCallbackFactory(
+ storaegComponentProvider, primaryIndex, jobId,
IndexOperation.UPSERT, primaryKeyFields);
- LockThenSearchOperationCallbackFactory searchCallbackFactory = new
LockThenSearchOperationCallbackFactory(
- jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
ResourceType.LSM_BTREE);
+ ISearchOperationCallbackFactory searchCallbackFactory =
dataset.getSearchCallbackFactory(
+ storaegComponentProvider, primaryIndex, jobId,
IndexOperation.UPSERT, primaryKeyFields);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1281,18 +1255,14 @@
getSplitProviderAndConstraints(dataset);
// prepare callback
- int datasetId = dataset.getDatasetId();
int[] primaryKeyFields = new int[numKeys];
for (i = 0; i < numKeys; i++) {
primaryKeyFields[i] = i;
}
- IModificationOperationCallbackFactory modificationCallbackFactory
= temp
- ? new
TempDatasetPrimaryIndexModificationOperationCallbackFactory(
- ((JobEventListenerFactory)
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
- primaryKeyFields, txnSubsystemProvider,
Operation.get(indexOp), ResourceType.LSM_BTREE)
- : new PrimaryIndexModificationOperationCallbackFactory(
- ((JobEventListenerFactory)
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
- primaryKeyFields, txnSubsystemProvider,
Operation.get(indexOp), ResourceType.LSM_BTREE);
+ IModificationOperationCallbackFactory modificationCallbackFactory
= dataset.getModificationCallbackFactory(
+ storaegComponentProvider, primaryIndex, jobId, indexOp,
primaryKeyFields);
+ ISearchOperationCallbackFactory searcgCallbackFactory = dataset
+ .getSearchCallbackFactory(storaegComponentProvider,
primaryIndex, jobId, indexOp, primaryKeyFields);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1310,7 +1280,7 @@
op = new LSMTreeInsertDeleteOperatorDescriptor(spec,
recordDesc, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields,
fieldPermutation, indexOp, idfh, null, true,
- indexName, null, modificationCallbackFactory,
NoOpOperationCallbackFactory.INSTANCE,
+ indexName, null, modificationCallbackFactory,
searcgCallbackFactory,
metadataPageManagerFactory);
}
return new Pair<>(op, splitsAndConstraint.second);
@@ -1483,15 +1453,10 @@
// prepare callback
JobId jobId = ((JobEventListenerFactory)
spec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
- IModificationOperationCallbackFactory modificationCallbackFactory
= temp
- ? new
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, Operation.get(indexOp),
- ResourceType.LSM_BTREE)
- : new
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, Operation.get(indexOp),
- ResourceType.LSM_BTREE);
-
+ IModificationOperationCallbackFactory modificationCallbackFactory
= dataset.getModificationCallbackFactory(
+ storaegComponentProvider, secondaryIndex, jobId, indexOp,
modificationCallbackPrimaryKeyFields);
+ ISearchOperationCallbackFactory searchOpCallbackFactory =
dataset.getSearchCallbackFactory(
+ storaegComponentProvider, secondaryIndex, jobId, indexOp,
modificationCallbackPrimaryKeyFields);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
IIndexDataflowHelperFactory idfh =
dataset.getIndexDataflowHelperFactory(this, secondaryIndex, itemType,
@@ -1508,13 +1473,13 @@
op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields,
fieldPermutation, idfh, filterFactory, false,
- indexName, null, modificationCallbackFactory,
NoOpOperationCallbackFactory.INSTANCE,
- prevFieldPermutation, metadataPageManagerFactory,
dataset.getFrameOpCallbackFactory());
+ indexName, null, modificationCallbackFactory,
searchOpCallbackFactory, prevFieldPermutation,
+ metadataPageManagerFactory,
dataset.getFrameOpCallbackFactory());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec,
recordDesc, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields,
fieldPermutation, indexOp, idfh, filterFactory,
- false, indexName, null, modificationCallbackFactory,
NoOpOperationCallbackFactory.INSTANCE,
+ false, indexName, null, modificationCallbackFactory,
searchOpCallbackFactory,
metadataPageManagerFactory);
}
return new Pair<>(op, splitsAndConstraint.second);
@@ -1641,15 +1606,10 @@
// prepare callback
JobId jobId = ((JobEventListenerFactory)
spec.getJobletEventListenerFactory()).getJobId();
- int datasetId = dataset.getDatasetId();
- IModificationOperationCallbackFactory modificationCallbackFactory
= temp
- ? new
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, Operation.get(indexOp),
- ResourceType.LSM_RTREE)
- : new
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, Operation.get(indexOp),
- ResourceType.LSM_RTREE);
-
+ IModificationOperationCallbackFactory modificationCallbackFactory
= dataset.getModificationCallbackFactory(
+ storaegComponentProvider, secondaryIndex, jobId, indexOp,
modificationCallbackPrimaryKeyFields);
+ ISearchOperationCallbackFactory searchCallbackFactory =
dataset.getSearchCallbackFactory(
+ storaegComponentProvider, secondaryIndex, jobId, indexOp,
modificationCallbackPrimaryKeyFields);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
IIndexDataflowHelperFactory indexDataflowHelperFactory =
dataset.getIndexDataflowHelperFactory(this,
@@ -1666,13 +1626,13 @@
op = new LSMTreeUpsertOperatorDescriptor(spec, recordDesc,
appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
comparatorFactories, null, fieldPermutation,
indexDataflowHelperFactory, filterFactory, false,
- indexName, null, modificationCallbackFactory,
NoOpOperationCallbackFactory.INSTANCE,
- prevFieldPermutation, metadataPageManagerFactory,
dataset.getFrameOpCallbackFactory());
+ indexName, null, modificationCallbackFactory,
searchCallbackFactory, prevFieldPermutation,
+ metadataPageManagerFactory,
dataset.getFrameOpCallbackFactory());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec,
recordDesc, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
comparatorFactories, null, fieldPermutation, indexOp,
indexDataflowHelperFactory, filterFactory,
- false, indexName, null, modificationCallbackFactory,
NoOpOperationCallbackFactory.INSTANCE,
+ false, indexName, null, modificationCallbackFactory,
searchCallbackFactory,
metadataPageManagerFactory);
}
return new Pair<>(op, splitsAndConstraint.second);
@@ -1852,13 +1812,10 @@
// prepare callback
JobId jobId = ((JobEventListenerFactory)
spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
- IModificationOperationCallbackFactory modificationCallbackFactory
= temp
- ? new
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, Operation.get(indexOp),
- ResourceType.LSM_INVERTED_INDEX)
- : new
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
- modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, Operation.get(indexOp),
- ResourceType.LSM_INVERTED_INDEX);
+ IModificationOperationCallbackFactory modificationCallbackFactory
= dataset.getModificationCallbackFactory(
+ storaegComponentProvider, secondaryIndex, jobId, indexOp,
modificationCallbackPrimaryKeyFields);
+ ISearchOperationCallbackFactory searchCallbackFactory =
dataset.getSearchCallbackFactory(
+ storaegComponentProvider, secondaryIndex, jobId, indexOp,
modificationCallbackPrimaryKeyFields);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
IIndexDataflowHelperFactory indexDataFlowFactory =
dataset.getIndexDataflowHelperFactory(this,
@@ -1875,14 +1832,16 @@
op = new LSMInvertedIndexUpsertOperatorDescriptor(spec,
recordDesc, appContext.getStorageManager(),
splitsAndConstraint.first,
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits,
invListComparatorFactories, tokenizerFactory,
- fieldPermutation, indexDataFlowFactory, filterFactory,
modificationCallbackFactory, indexName,
+ fieldPermutation, indexDataFlowFactory, filterFactory,
modificationCallbackFactory,
+ searchCallbackFactory, indexName,
prevFieldPermutation, metadataPageManagerFactory);
} else {
op = new LSMInvertedIndexInsertDeleteOperatorDescriptor(spec,
recordDesc,
appContext.getStorageManager(),
splitsAndConstraint.first,
appContext.getIndexLifecycleManagerProvider(),
tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, invListComparatorFactories,
tokenizerFactory, fieldPermutation, indexOp,
- indexDataFlowFactory, filterFactory,
modificationCallbackFactory, indexName,
+ indexDataFlowFactory, filterFactory,
modificationCallbackFactory, searchCallbackFactory,
+ indexName,
metadataPageManagerFactory);
}
return new Pair<>(op, splitsAndConstraint.second);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index cd8cf3b..ff54642 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -68,6 +68,7 @@
import
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
import
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory;
import
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
+import
org.apache.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
import
org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
import
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
@@ -519,9 +520,13 @@
int[] primaryKeyFields) throws AlgebricksException {
if (getDatasetDetails().isTemp()) {
return op == IndexOperation.DELETE || op == IndexOperation.INSERT
|| op == IndexOperation.UPSERT
- ? new
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId,
getDatasetId(),
- primaryKeyFields,
componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
- index.resourceType())
+ ? index.isPrimaryIndex()
+ ? new
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ primaryKeyFields,
componentProvider.getTransactionSubsystemProvider(),
+ Operation.get(op), index.resourceType())
+ : new
TempDatasetSecondaryIndexModificationOperationCallbackFactory(jobId,
getDatasetId(),
+ primaryKeyFields,
componentProvider.getTransactionSubsystemProvider(),
+ Operation.get(op), index.resourceType())
: NoOpOperationCallbackFactory.INSTANCE;
} else if (index.isPrimaryIndex()) {
return op == IndexOperation.UPSERT
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
index f1547a8..9d0b917 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMInvertedIndexUpsertOperatorDescriptor.java
@@ -31,6 +31,7 @@
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
@@ -50,11 +51,11 @@
IBinaryComparatorFactory[] invListComparatorFactories,
IBinaryTokenizerFactory tokenizerFactory,
int[] fieldPermutation, IIndexDataflowHelperFactory
dataflowHelperFactory,
ITupleFilterFactory tupleFilterFactory,
IModificationOperationCallbackFactory modificationOpCallbackFactory,
- String indexName, int[] prevFieldPermutation, IPageManagerFactory
pageManagerFactory) {
+ ISearchOperationCallbackFactory searchCallbackFactory, String
indexName, int[] prevFieldPermutation, IPageManagerFactory pageManagerFactory) {
super(spec, recDesc, storageManager, fileSplitProvider,
lifecycleManagerProvider, tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits,
invListComparatorFactories, tokenizerFactory,
fieldPermutation, IndexOperation.UPSERT,
dataflowHelperFactory, tupleFilterFactory,
- modificationOpCallbackFactory, indexName, pageManagerFactory);
+ modificationOpCallbackFactory, searchCallbackFactory,
indexName, pageManagerFactory);
this.prevFieldPermutation = prevFieldPermutation;
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
index b37ecae..e6cfd2a 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
@@ -54,13 +54,13 @@
IBinaryComparatorFactory[] comparatorFactories, int[]
bloomFilterKeyFields, int[] fieldPermutation,
IIndexDataflowHelperFactory dataflowHelperFactory,
ITupleFilterFactory tupleFilterFactory,
boolean isPrimary, String indexName, IMissingWriterFactory
missingWriterFactory,
- IModificationOperationCallbackFactory
modificationOpCallbackProvider,
+ IModificationOperationCallbackFactory
modificationOpCallbackFactory,
ISearchOperationCallbackFactory searchOpCallbackProvider, int[]
prevValuePermutation,
IPageManagerFactory pageManagerFactory,
IFrameOperationCallbackFactory frameOpCallbackFactory) {
super(spec, recDesc, storageManager, lifecycleManagerProvider,
fileSplitProvider, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation,
IndexOperation.UPSERT,
dataflowHelperFactory, tupleFilterFactory, isPrimary,
indexName, missingWriterFactory,
- modificationOpCallbackProvider, searchOpCallbackProvider,
pageManagerFactory);
+ modificationOpCallbackFactory, searchOpCallbackProvider,
pageManagerFactory);
this.prevValuePermutation = prevValuePermutation;
this.frameOpCallbackFactory = frameOpCallbackFactory;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
index e9c0e5f..da3e986 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/SynchronousScheduler.java
@@ -18,20 +18,27 @@
*/
package org.apache.hyracks.storage.am.lsm.common.impls;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IndexException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
-public enum SynchronousScheduler implements ILSMIOOperationScheduler {
- INSTANCE;
+public class SynchronousScheduler implements ILSMIOOperationScheduler {
+ private static final Logger LOGGER =
Logger.getLogger(SynchronousScheduler.class.getName());
+ public static final SynchronousScheduler INSTANCE = new
SynchronousScheduler();
+
+ private SynchronousScheduler() {
+ }
@Override
public void scheduleOperation(ILSMIOOperation operation) throws
HyracksDataException {
try {
operation.call();
- } catch (IndexException e) {
- throw new HyracksDataException(e);
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "IO Operation failed", e);
+ throw HyracksDataException.create(e);
}
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
index 46201d5..a342370 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexInsertUpdateDeleteOperator.java
@@ -31,9 +31,9 @@
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
import
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.IPageManagerFactory;
+import
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
@@ -55,11 +55,12 @@
int[] fieldPermutation, IndexOperation op,
IIndexDataflowHelperFactory dataflowHelperFactory,
ITupleFilterFactory tupleFilterFactory,
IModificationOperationCallbackFactory
modificationOpCallbackFactory,
+ ISearchOperationCallbackFactory searchCallbackFactory,
IPageManagerFactory pageManagerFactory) {
super(spec, 1, 1, recDesc, storageManager, fileSplitProvider,
lifecycleManagerProvider, tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits,
invListComparatorFactories, tokenizerFactory,
dataflowHelperFactory, tupleFilterFactory, false, false,
- null, NoOpLocalResourceFactoryProvider.INSTANCE,
NoOpOperationCallbackFactory.INSTANCE,
+ null, NoOpLocalResourceFactoryProvider.INSTANCE,
searchCallbackFactory,
modificationOpCallbackFactory,
pageManagerFactory);
this.fieldPermutation = fieldPermutation;
--
To view, visit https://asterix-gerrit.ics.uci.edu/1613
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Idbeacf5af01b77c5f81b59aa6acec9b13762d629
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>