abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1711
Change subject: Disable search in upsert with no secondary indexes
......................................................................
Disable search in upsert with no secondary indexes
Change-Id: I4af1dc73ec909b98ce0c981c687f1b9bf13030fa
---
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
3 files changed, 69 insertions(+), 51 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/11/1711/1
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index d111eb2..22ba884 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1073,6 +1073,9 @@
primaryKeyFields[i] = i;
}
+ boolean hasSecondaries = MetadataManager.INSTANCE
+ .getDatasetIndexes(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName()).size() > 1;
+
IModificationOperationCallbackFactory modificationCallbackFactory
= dataset.getModificationCallbackFactory(
storaegComponentProvider, primaryIndex, jobId,
IndexOperation.UPSERT, primaryKeyFields);
@@ -1127,7 +1130,7 @@
appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields,
fieldPermutation, idfh, null, true, indexName,
context.getMissingWriterFactory(),
modificationCallbackFactory, searchCallbackFactory, null,
- metadataPageManagerFactory,
dataset.getFrameOpCallbackFactory());
+ metadataPageManagerFactory,
dataset.getFrameOpCallbackFactory(), hasSecondaries);
op.setType(itemType);
op.setFilterIndex(fieldIdx);
return new Pair<>(op, splitsAndConstraint.second);
@@ -1479,7 +1482,7 @@
appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
comparatorFactories, bloomFilterKeyFields,
fieldPermutation, idfh, filterFactory, false,
indexName, null, modificationCallbackFactory,
searchOpCallbackFactory, prevFieldPermutation,
- metadataPageManagerFactory,
dataset.getFrameOpCallbackFactory());
+ metadataPageManagerFactory,
dataset.getFrameOpCallbackFactory(), true);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec,
recordDesc, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
@@ -1632,7 +1635,7 @@
appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
comparatorFactories, null, fieldPermutation,
indexDataflowHelperFactory, filterFactory, false,
indexName, null, modificationCallbackFactory,
searchCallbackFactory, prevFieldPermutation,
- metadataPageManagerFactory,
dataset.getFrameOpCallbackFactory());
+ metadataPageManagerFactory,
dataset.getFrameOpCallbackFactory(), true);
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec,
recordDesc, appContext.getStorageManager(),
appContext.getIndexLifecycleManagerProvider(),
splitsAndConstraint.first, typeTraits,
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index ed1a247..03ef88f 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -89,11 +89,12 @@
private IFrameOperationCallback frameOpCallback;
private final IFrameOperationCallbackFactory frameOpCallbackFactory;
private AbstractIndexModificationOperationCallback abstractModCallback;
+ private final boolean hasSecondaries;
public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor
opDesc, IHyracksTaskContext ctx, int partition,
int[] fieldPermutation, IRecordDescriptorProvider
recordDescProvider, int numOfPrimaryKeys,
- ARecordType recordType, int filterFieldIndex,
IFrameOperationCallbackFactory frameOpCallbackFactory)
- throws HyracksDataException {
+ ARecordType recordType, int filterFieldIndex,
IFrameOperationCallbackFactory frameOpCallbackFactory,
+ boolean hasSecondaries) throws HyracksDataException {
super(opDesc, ctx, partition, fieldPermutation, recordDescProvider,
IndexOperation.UPSERT);
this.key = new PermutingFrameTupleReference();
this.numOfPrimaryKeys = numOfPrimaryKeys;
@@ -116,6 +117,7 @@
this.prevRecWithPKWithFilterValue = new
ArrayTupleBuilder(fieldPermutation.length + (hasMeta ? 1 : 0));
this.prevDos = prevRecWithPKWithFilterValue.getDataOutput();
}
+ this.hasSecondaries = hasSecondaries;
}
// we have the permutation which has [pk locations, record location,
optional:filter-location]
@@ -213,50 +215,47 @@
boolean recordWasInserted = false;
tuple.reset(accessor, i);
resetSearchPredicate(i);
- lsmAccessor.search(cursor, searchPred);
- if (cursor.hasNext()) {
- cursor.next();
- prevTuple = cursor.getTuple();
- cursor.reset();
- if (isFiltered) {
- prevTuple = getPrevTupleWithFilter(prevTuple);
- }
- dos.write(prevTuple.getFieldData(numOfPrimaryKeys),
prevTuple.getFieldStart(numOfPrimaryKeys),
- prevTuple.getFieldLength(numOfPrimaryKeys));
- tb.addFieldEndOffset();
- // if has meta, then append meta
- if (hasMeta) {
- dos.write(prevTuple.getFieldData(metaFieldIndex),
prevTuple.getFieldStart(metaFieldIndex),
- prevTuple.getFieldLength(metaFieldIndex));
- tb.addFieldEndOffset();
- }
- // if with filters, append the filter
- if (isFiltered) {
- dos.write(prevTuple.getFieldData(filterFieldIndex),
prevTuple.getFieldStart(filterFieldIndex),
- prevTuple.getFieldLength(filterFieldIndex));
- tb.addFieldEndOffset();
- }
- if (isNull(tuple, numOfPrimaryKeys)) {
- // Only delete if it is a delete and not upsert
- abstractModCallback.setOp(Operation.DELETE);
- if (firstModification) {
- lsmAccessor.delete(prevTuple);
- firstModification = false;
- } else {
- lsmAccessor.forceDelete(prevTuple);
+ if (hasSecondaries || isNull(tuple, numOfPrimaryKeys)) {
+ lsmAccessor.search(cursor, searchPred);
+ if (cursor.hasNext()) {
+ cursor.next();
+ prevTuple = cursor.getTuple();
+ cursor.reset();
+ if (isFiltered) {
+ prevTuple = getPrevTupleWithFilter(prevTuple);
}
+ dos.write(prevTuple.getFieldData(numOfPrimaryKeys),
prevTuple.getFieldStart(numOfPrimaryKeys),
+ prevTuple.getFieldLength(numOfPrimaryKeys));
+ tb.addFieldEndOffset();
+ // if has meta, then append meta
+ if (hasMeta) {
+ dos.write(prevTuple.getFieldData(metaFieldIndex),
prevTuple.getFieldStart(metaFieldIndex),
+ prevTuple.getFieldLength(metaFieldIndex));
+ tb.addFieldEndOffset();
+ }
+ // if with filters, append the filter
+ if (isFiltered) {
+ dos.write(prevTuple.getFieldData(filterFieldIndex),
+ prevTuple.getFieldStart(filterFieldIndex),
+
prevTuple.getFieldLength(filterFieldIndex));
+ tb.addFieldEndOffset();
+ }
+ if (isNull(tuple, numOfPrimaryKeys)) {
+ // Only delete if it is a delete and not upsert
+ abstractModCallback.setOp(Operation.DELETE);
+ if (firstModification) {
+ lsmAccessor.delete(prevTuple);
+ firstModification = false;
+ } else {
+ lsmAccessor.forceDelete(prevTuple);
+ }
+ }
+ } else {
+ appendNullPreviousTuple();
}
} else {
- prevTuple = null;
- addNullField();
- if (hasMeta) {
- addNullField();
- }
- // if with filters, append null
- if (isFiltered) {
- addNullField();
- }
- cursor.reset();
+ searchCallback.before(key);
+ appendNullPreviousTuple();
}
if (!isNull(tuple, numOfPrimaryKeys)) {
abstractModCallback.setOp(Operation.UPSERT);
@@ -279,6 +278,19 @@
}
}
+ private void appendNullPreviousTuple() throws IOException {
+ prevTuple = null;
+ addNullField();
+ if (hasMeta) {
+ addNullField();
+ }
+ // if with filters, append null
+ if (isFiltered) {
+ addNullField();
+ }
+ cursor.reset();
+ }
+
/**
* Flushes tuples (which have already been written to tuple appender's
buffer in writeOutput() method)
* to the next operator/consumer.
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
index e6cfd2a..52b52da 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMTreeUpsertOperatorDescriptor.java
@@ -47,6 +47,7 @@
private final IFrameOperationCallbackFactory frameOpCallbackFactory;
private ARecordType type;
private int filterIndex = -1;
+ private final boolean hasSecondaries;
public LSMTreeUpsertOperatorDescriptor(IOperatorDescriptorRegistry spec,
RecordDescriptor recDesc,
IStorageManager storageManager, IIndexLifecycleManagerProvider
lifecycleManagerProvider,
@@ -56,23 +57,25 @@
boolean isPrimary, String indexName, IMissingWriterFactory
missingWriterFactory,
IModificationOperationCallbackFactory
modificationOpCallbackFactory,
ISearchOperationCallbackFactory searchOpCallbackProvider, int[]
prevValuePermutation,
- IPageManagerFactory pageManagerFactory,
IFrameOperationCallbackFactory frameOpCallbackFactory) {
+ IPageManagerFactory pageManagerFactory,
IFrameOperationCallbackFactory frameOpCallbackFactory,
+ boolean hasSecondaries) {
super(spec, recDesc, storageManager, lifecycleManagerProvider,
fileSplitProvider, typeTraits,
comparatorFactories, bloomFilterKeyFields, fieldPermutation,
IndexOperation.UPSERT,
dataflowHelperFactory, tupleFilterFactory, isPrimary,
indexName, missingWriterFactory,
modificationOpCallbackFactory, searchOpCallbackProvider,
pageManagerFactory);
this.prevValuePermutation = prevValuePermutation;
this.frameOpCallbackFactory = frameOpCallbackFactory;
+ this.hasSecondaries = hasSecondaries;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
return isPrimary()
- ? new LSMPrimaryUpsertOperatorNodePushable(this, ctx,
partition, fieldPermutation,
- recordDescProvider, comparatorFactories.length, type,
filterIndex, frameOpCallbackFactory)
- : new LSMSecondaryUpsertOperatorNodePushable(this, ctx,
partition, fieldPermutation,
- recordDescProvider, prevValuePermutation);
+ ? new LSMPrimaryUpsertOperatorNodePushable(this, ctx,
partition, fieldPermutation, recordDescProvider,
+ comparatorFactories.length, type, filterIndex,
frameOpCallbackFactory, hasSecondaries)
+ : new LSMSecondaryUpsertOperatorNodePushable(this, ctx,
partition, fieldPermutation, recordDescProvider,
+ prevValuePermutation);
}
public void setType(ARecordType type) {
--
To view, visit https://asterix-gerrit.ics.uci.edu/1711
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I4af1dc73ec909b98ce0c981c687f1b9bf13030fa
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>