Till Westmann has submitted this change and it was merged. Change subject: Fix Upsert to Never Enforce the First Operation ......................................................................
Fix Upsert to Never Enforce the First Operation Change-Id: I8ec784e2d6ff39758ab701d4f36fc85c278178f2 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1336 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java 2 files changed, 23 insertions(+), 5 deletions(-) Approvals: Till Westmann: Looks good to me, approved Jenkins: Verified; No violations found; Verified diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java index 96f9e76..536366f 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java @@ -24,12 +24,14 @@ import org.apache.asterix.common.api.IAsterixAppRuntimeContext; import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil; +import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.transactions.ILogMarkerCallback; import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback; import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback; import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.IMissingWriter; @@ -80,6 +82,7 @@ private final boolean hasMeta; private final int filterFieldIndex; private final int metaFieldIndex; + private LockThenSearchOperationCallback searchCallback; public AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx, int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys, @@ -140,8 +143,9 @@ modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback( indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(), index, ctx, this); - indexAccessor = index.createAccessor(modCallback, opDesc.getSearchOpCallbackFactory() - .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this)); + searchCallback = (LockThenSearchOperationCallback) opDesc.getSearchOpCallbackFactory() + .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this); + indexAccessor = index.createAccessor(modCallback, searchCallback); cursor = indexAccessor.createSearchCursor(false); frameTuple = new FrameTupleReference(); IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext() @@ -167,6 +171,12 @@ } if (recordWasInserted || recordWasDeleted) { FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize()); + } else { + try { + searchCallback.release(); + } catch (ACIDException e) { + throw new HyracksDataException(e); + } } } @@ -185,6 +195,7 @@ accessor.reset(buffer); LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessor; int tupleCount = accessor.getTupleCount(); + boolean firstModification = true; int i = 0; try { while (i < tupleCount) { @@ -217,8 +228,9 @@ tb.addFieldEndOffset(); } modCallback.setOp(Operation.DELETE); - if (i == 0) { + if (firstModification) { lsmAccessor.delete(prevTuple); + firstModification = false; } else { lsmAccessor.forceDelete(prevTuple); } @@ -236,8 +248,9 @@ } if (!isNull(tuple, numOfPrimaryKeys)) { modCallback.setOp(Operation.INSERT); - if ((prevTuple == null) && (i == 0)) { + if (firstModification) { lsmAccessor.insert(tuple); + firstModification = false; } else { lsmAccessor.forceInsert(tuple); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java index ef3b218..288e7a5 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java @@ -42,6 +42,7 @@ private final LSMIndexInsertUpdateDeleteOperatorNodePushable operatorNodePushable; private final ILogManager logManager; private final ILogRecord logRecord; + private int pkHash; public LockThenSearchOperationCallback(int datasetId, int[] entityIdFields, ITransactionSubsystem txnSubsystem, ITransactionContext txnCtx, IOperatorNodePushable operatorNodePushable) { @@ -75,7 +76,7 @@ @Override public void before(ITupleReference tuple) throws HyracksDataException { - int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields); + pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields); try { if (operatorNodePushable != null) { @@ -122,4 +123,8 @@ private void logWait() throws ACIDException { logManager.log(logRecord); } + + public void release() throws ACIDException { + lockManager.unlock(datasetId, pkHash, LockMode.X, txnCtx); + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1336 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I8ec784e2d6ff39758ab701d4f36fc85c278178f2 Gerrit-PatchSet: 3 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: abdullah alamoudi <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Till Westmann <[email protected]> Gerrit-Reviewer: Yingyi Bu <[email protected]>
