Till Westmann has submitted this change and it was merged.

Change subject: Fix Upsert to Never Enforce the First Operation
......................................................................


Fix Upsert to Never Enforce the First Operation

Change-Id: I8ec784e2d6ff39758ab701d4f36fc85c278178f2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1336
Sonar-Qube: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Till Westmann <[email protected]>
---
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
2 files changed, 23 insertions(+), 5 deletions(-)

Approvals:
  Till Westmann: Looks good to me, approved
  Jenkins: Verified; No violations found; Verified



diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index 96f9e76..536366f 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -24,12 +24,14 @@
 
 import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
 import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
+import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.transactions.ILogMarkerCallback;
 import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
 import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
+import 
org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback;
 import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
@@ -80,6 +82,7 @@
     private final boolean hasMeta;
     private final int filterFieldIndex;
     private final int metaFieldIndex;
+    private LockThenSearchOperationCallback searchCallback;
 
     public 
AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, 
IHyracksTaskContext ctx,
             int partition, int[] fieldPermutation, IRecordDescriptorProvider 
recordDescProvider, int numOfPrimaryKeys,
@@ -140,8 +143,9 @@
             modCallback = 
opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
                     indexHelper.getResourcePath(), 
indexHelper.getResourceID(), indexHelper.getResourcePartition(),
                     index, ctx, this);
-            indexAccessor = index.createAccessor(modCallback, 
opDesc.getSearchOpCallbackFactory()
-                    
.createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
+            searchCallback = (LockThenSearchOperationCallback) 
opDesc.getSearchOpCallbackFactory()
+                    
.createSearchOperationCallback(indexHelper.getResourceID(), ctx, this);
+            indexAccessor = index.createAccessor(modCallback, searchCallback);
             cursor = indexAccessor.createSearchCursor(false);
             frameTuple = new FrameTupleReference();
             IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) 
ctx.getJobletContext()
@@ -167,6 +171,12 @@
         }
         if (recordWasInserted || recordWasDeleted) {
             FrameUtils.appendToWriter(writer, appender, 
tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
+        } else {
+            try {
+                searchCallback.release();
+            } catch (ACIDException e) {
+                throw new HyracksDataException(e);
+            }
         }
     }
 
@@ -185,6 +195,7 @@
         accessor.reset(buffer);
         LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) 
indexAccessor;
         int tupleCount = accessor.getTupleCount();
+        boolean firstModification = true;
         int i = 0;
         try {
             while (i < tupleCount) {
@@ -217,8 +228,9 @@
                         tb.addFieldEndOffset();
                     }
                     modCallback.setOp(Operation.DELETE);
-                    if (i == 0) {
+                    if (firstModification) {
                         lsmAccessor.delete(prevTuple);
+                        firstModification = false;
                     } else {
                         lsmAccessor.forceDelete(prevTuple);
                     }
@@ -236,8 +248,9 @@
                 }
                 if (!isNull(tuple, numOfPrimaryKeys)) {
                     modCallback.setOp(Operation.INSERT);
-                    if ((prevTuple == null) && (i == 0)) {
+                    if (firstModification) {
                         lsmAccessor.insert(tuple);
+                        firstModification = false;
                     } else {
                         lsmAccessor.forceInsert(tuple);
                     }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
index ef3b218..288e7a5 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -42,6 +42,7 @@
     private final LSMIndexInsertUpdateDeleteOperatorNodePushable 
operatorNodePushable;
     private final ILogManager logManager;
     private final ILogRecord logRecord;
+    private int pkHash;
 
     public LockThenSearchOperationCallback(int datasetId, int[] 
entityIdFields, ITransactionSubsystem txnSubsystem,
             ITransactionContext txnCtx, IOperatorNodePushable 
operatorNodePushable) {
@@ -75,7 +76,7 @@
 
     @Override
     public void before(ITupleReference tuple) throws HyracksDataException {
-        int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
+        pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
         try {
             if (operatorNodePushable != null) {
 
@@ -122,4 +123,8 @@
     private void logWait() throws ACIDException {
         logManager.log(logRecord);
     }
+
+    public void release() throws ACIDException {
+        lockManager.unlock(datasetId, pkHash, LockMode.X, txnCtx);
+    }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1336
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I8ec784e2d6ff39758ab701d4f36fc85c278178f2
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Till Westmann <[email protected]>
Gerrit-Reviewer: Yingyi Bu <[email protected]>

Reply via email to