Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2875

Change subject: [NO ISSUE][TX] Fix Concurrent Access in TransactionContext
......................................................................

[NO ISSUE][TX] Fix Concurrent Access in TransactionContext

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Ensure all access to TransactionContext is thread safe.

Change-Id: Id7cc9e67cd51e06cf78b0ea231d3970e5199573c
---
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
3 files changed, 25 insertions(+), 31 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/75/2875/1

diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
index 95cabf9..cc27cd8 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -50,7 +50,7 @@
         txnState = new AtomicInteger(ITransactionManager.ACTIVE);
         isTimeout = false;
         isWriteTxn = new AtomicBoolean();
-        txnOpTrackers = new HashMap<>();
+        txnOpTrackers = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -120,13 +120,11 @@
     @Override
     public void register(long resourceId, int partition, ILSMIndex index, 
IModificationOperationCallback callback,
             boolean primaryIndex) {
-        synchronized (txnOpTrackers) {
-            if (!txnOpTrackers.containsKey(resourceId)) {
-                final ITransactionOperationTracker txnOpTracker =
-                        (ITransactionOperationTracker) 
index.getOperationTracker();
-                txnOpTrackers.put(resourceId, txnOpTracker);
-                txnOpTracker.beforeTransaction(resourceId);
-            }
+        if (!txnOpTrackers.containsKey(resourceId)) {
+            final ITransactionOperationTracker txnOpTracker =
+                    (ITransactionOperationTracker) index.getOperationTracker();
+            txnOpTrackers.put(resourceId, txnOpTracker);
+            txnOpTracker.beforeTransaction(resourceId);
         }
     }
 
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
index 079e99a..49cd82b 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.asterix.common.exceptions.ACIDException;
@@ -35,9 +35,9 @@
 @ThreadSafe
 public class AtomicTransactionContext extends AbstractTransactionContext {
 
-    private final Map<Long, ILSMOperationTracker> opTrackers = new HashMap<>();
-    private final Map<Long, AtomicInteger> indexPendingOps = new HashMap<>();
-    private final Map<Long, IModificationOperationCallback> callbacks = new 
HashMap<>();
+    private final Map<Long, ILSMOperationTracker> opTrackers = new 
ConcurrentHashMap<>();
+    private final Map<Long, AtomicInteger> indexPendingOps = new 
ConcurrentHashMap<>();
+    private final Map<Long, IModificationOperationCallback> callbacks = new 
ConcurrentHashMap<>();
 
     public AtomicTransactionContext(TxnId txnId) {
         super(txnId);
@@ -47,12 +47,10 @@
     public void register(long resourceId, int partition, ILSMIndex index, 
IModificationOperationCallback callback,
             boolean primaryIndex) {
         super.register(resourceId, partition, index, callback, primaryIndex);
-        synchronized (txnOpTrackers) {
-            if (primaryIndex && !opTrackers.containsKey(resourceId)) {
-                opTrackers.put(resourceId, index.getOperationTracker());
-                callbacks.put(resourceId, callback);
-                indexPendingOps.put(resourceId, new AtomicInteger(0));
-            }
+        if (primaryIndex && !opTrackers.containsKey(resourceId)) {
+            opTrackers.put(resourceId, index.getOperationTracker());
+            callbacks.put(resourceId, callback);
+            indexPendingOps.put(resourceId, new AtomicInteger(0));
         }
     }
 
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
index 9fcb08b..188bb1b 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -18,8 +18,8 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
@@ -42,23 +42,21 @@
 
     public EntityLevelTransactionContext(TxnId txnId) {
         super(txnId);
-        this.primaryIndexTrackers = new HashMap<>();
-        this.resourcePendingOps = new HashMap<>();
-        this.partitionPendingOps = new HashMap<>();
+        this.primaryIndexTrackers = new ConcurrentHashMap<>();
+        this.resourcePendingOps = new ConcurrentHashMap<>();
+        this.partitionPendingOps = new ConcurrentHashMap<>();
     }
 
     @Override
     public void register(long resourceId, int partition, ILSMIndex index, 
IModificationOperationCallback callback,
             boolean primaryIndex) {
         super.register(resourceId, partition, index, callback, primaryIndex);
-        synchronized (txnOpTrackers) {
-            AtomicInteger pendingOps = 
partitionPendingOps.computeIfAbsent(partition, p -> new AtomicInteger(0));
-            resourcePendingOps.put(resourceId, pendingOps);
-            if (primaryIndex) {
-                Pair<PrimaryIndexOperationTracker, 
IModificationOperationCallback> pair =
-                        new Pair<>((PrimaryIndexOperationTracker) 
index.getOperationTracker(), callback);
-                primaryIndexTrackers.put(partition, pair);
-            }
+        AtomicInteger pendingOps = 
partitionPendingOps.computeIfAbsent(partition, p -> new AtomicInteger(0));
+        resourcePendingOps.put(resourceId, pendingOps);
+        if (primaryIndex) {
+            Pair<PrimaryIndexOperationTracker, IModificationOperationCallback> 
pair =
+                    new Pair<>((PrimaryIndexOperationTracker) 
index.getOperationTracker(), callback);
+            primaryIndexTrackers.put(partition, pair);
         }
     }
 

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2875
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Id7cc9e67cd51e06cf78b0ea231d3970e5199573c
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to