Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/2156

Change subject: [NO ISSUE][TX] Introduce Atomic Transactions
......................................................................

[NO ISSUE][TX] Introduce Atomic Transactions

- user model changes: no
- storage format changes: no
- interface changes: yes
  - Redesign and simplify ITransactionManager API
  - Redesign and simplify ITransactionContext API

Details:
- Introduce atomic transactions. Unlike entity level transaction,
  atomic transaction do not generate any entity commit logs and
  may modify multiple primary indexes. Therefore, either all the
  operations of an atomic transaction will be committed or nothing.
  Atomic transaction are used by metadata transaction, while other
  transaction still use entity level transactions.
- Add index resource id to AbstractOperationCallback.
- Refactor metadata index modification code.
- Remove unused class MutableResourceId
- Remove unused class FieldsHashValueGenerator
- Add test case for concurrent metadata transactions.

Change-Id: I13db1c15f8afbdaae608ff0a7468fe62bf1daccd
---
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
A 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TransactionOptions.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
A 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
A 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
A 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
D 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
D 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MutableResourceId.java
D 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
A 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
38 files changed, 876 insertions(+), 757 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/56/2156/1

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 00b185d..5c7d450 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -146,7 +146,7 @@
         lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
         indexDataflowHelper.close();
         nc.newJobId();
-        txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+        txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx));
         insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE, 
META_TYPE, null, KEY_INDEXES,
                 KEY_INDICATORS_LIST, storageManager).getLeft();
     }
@@ -188,7 +188,7 @@
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, 
DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
 
             // get all components
             List<ILSMMemoryComponent> memComponents = 
lsmBtree.getMemoryComponents();
@@ -239,8 +239,7 @@
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, 
DatasetId.NULL, -1, true);
-
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // get all components
             List<ILSMMemoryComponent> memComponents = 
lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = 
lsmBtree.getDiskComponents();
@@ -255,7 +254,7 @@
 
             // insert again
             nc.newJobId();
-            txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+            txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx));
             insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, 
RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
                     KEY_INDICATORS_LIST, storageManager).getLeft();
             insertOp.open();
@@ -267,7 +266,7 @@
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, 
DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             searchAndAssertCount(nc, ctx, dataset, storageManager, 
TOTAL_NUM_OF_RECORDS);
             // rollback the last disk component
             lsmAccessor = 
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -308,7 +307,7 @@
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, 
DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
 
             // get all components
             List<ILSMMemoryComponent> memComponents = 
lsmBtree.getMemoryComponents();
@@ -378,7 +377,7 @@
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, 
DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // get all components
             List<ILSMMemoryComponent> memComponents = 
lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = 
lsmBtree.getDiskComponents();
@@ -434,7 +433,7 @@
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, 
DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // get all components
             List<ILSMMemoryComponent> memComponents = 
lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = 
lsmBtree.getDiskComponents();
@@ -498,7 +497,7 @@
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, 
DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // get all components
             List<ILSMMemoryComponent> memComponents = 
lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = 
lsmBtree.getDiskComponents();
@@ -558,7 +557,7 @@
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, 
DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // get all components
             List<ILSMMemoryComponent> memComponents = 
lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = 
lsmBtree.getDiskComponents();
@@ -620,7 +619,7 @@
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, 
DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // get all components
             List<ILSMMemoryComponent> memComponents = 
lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = 
lsmBtree.getDiskComponents();
@@ -691,7 +690,7 @@
                 tupleAppender.write(insertOp, true);
             }
             insertOp.close();
-            nc.getTransactionManager().completedTransaction(txnCtx, 
DatasetId.NULL, -1, true);
+            nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             // get all components
             List<ILSMMemoryComponent> memComponents = 
lsmBtree.getMemoryComponents();
             List<ILSMDiskComponent> diskComponents = 
lsmBtree.getDiskComponents();
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 82eb16a..f5e833b 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -119,8 +119,7 @@
                         storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
                 IHyracksTaskContext ctx = nc.createTestContext(true);
                 nc.newJobId();
-                ITransactionContext txnCtx =
-                        
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+                ITransactionContext txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx));
                 LSMInsertDeleteOperatorNodePushable insertOp = 
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
                         RECORD_TYPE, META_TYPE, null, KEY_INDEXES, 
KEY_INDICATORS_LIST, storageManager).getLeft();
                 insertOp.open();
@@ -147,7 +146,7 @@
                     tupleAppender.write(insertOp, true);
                 }
                 insertOp.close();
-                nc.getTransactionManager().completedTransaction(txnCtx, 
DatasetId.NULL, -1, true);
+                
nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
                 IndexDataflowHelperFactory iHelperFactory =
                         new IndexDataflowHelperFactory(nc.getStorageManager(), 
indexInfo.getFileSplitProvider());
                 IIndexDataflowHelper dataflowHelper =
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 5384c92..39ac972 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -34,7 +34,6 @@
 import org.apache.asterix.common.configuration.Property;
 import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
 import org.apache.asterix.common.transactions.Checkpoint;
-import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -128,8 +127,7 @@
                         KEY_INDICATOR_LIST);
                 IHyracksTaskContext ctx = nc.createTestContext(false);
                 nc.newJobId();
-                ITransactionContext txnCtx =
-                        
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+                ITransactionContext txnCtx = 
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx));
                 // Prepare insert operation
                 LSMInsertDeleteOperatorNodePushable insertOp = 
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
                         RECORD_TYPE, META_TYPE, null, KEY_INDEXES, 
KEY_INDICATOR_LIST, storageManager).getLeft();
@@ -202,7 +200,7 @@
                     tupleAppender.write(insertOp, true);
                 }
                 insertOp.close();
-                nc.getTransactionManager().completedTransaction(txnCtx, 
DatasetId.NULL, -1, true);
+                
nc.getTransactionManager().commitTransaction(txnCtx.getTxnId());
             } finally {
                 nc.deInit();
             }
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
index 3969ec5..3e906b4 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
@@ -21,11 +21,20 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.MetadataTransactionContext;
@@ -36,7 +45,9 @@
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.test.common.TestExecutor;
 import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -137,4 +148,88 @@
             MetadataManager.INSTANCE.commitTransaction(readMdTxn);
         }
     }
+
+    @Test
+    public void concurrentMetadataTxn() throws Exception {
+        // get create type and dataset
+        String datasetName = "dataset1";
+        final TestCaseContext.OutputFormat format = 
TestCaseContext.OutputFormat.CLEAN_JSON;
+        testExecutor.executeSqlppUpdateOrDdl("CREATE TYPE KeyType AS { id: int 
};", format);
+        testExecutor.executeSqlppUpdateOrDdl("CREATE DATASET " + datasetName + 
"(KeyType) PRIMARY KEY id;", format);
+
+        // get created dataset
+        ICcApplicationContext appCtx =
+                (ICcApplicationContext) 
integrationUtil.getClusterControllerService().getApplicationContext();
+        MetadataProvider metadataProvider = new MetadataProvider(appCtx, null);
+        final MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(mdTxnCtx);
+        Dataset sourceDataset;
+        try {
+            sourceDataset = 
metadataProvider.findDataset(MetadataBuiltinEntities.DEFAULT_DATAVERSE_NAME, 
datasetName);
+            MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+
+        /*
+         * Concurrently insert copies of the created dataset with
+         * different names and either commit or abort the transaction.
+         */
+        final AtomicInteger failCount = new AtomicInteger(0);
+        Thread transactor1 = new Thread(() -> IntStream.range(1, 
100).forEach(x -> {
+            try {
+                addDataset(appCtx, sourceDataset, x, x % 2 == 0);
+            } catch (Exception e) {
+                e.printStackTrace();
+                failCount.incrementAndGet();
+            }
+        }));
+
+        Thread transactor2 = new Thread(() -> IntStream.range(101, 
200).forEach(x -> {
+            try {
+                addDataset(appCtx, sourceDataset, x, x % 3 == 0);
+            } catch (Exception e) {
+                e.printStackTrace();
+                failCount.incrementAndGet();
+            }
+        }));
+
+        transactor1.start();
+        transactor2.start();
+        transactor1.join();
+        transactor2.join();
+
+        Assert.assertEquals(0, failCount.get());
+
+        // make sure all metadata indexes have no pending operations after all 
txns committed/aborted
+        final IDatasetLifecycleManager datasetLifecycleManager =
+                ((INcApplicationContext) 
integrationUtil.ncs[0].getApplicationContext()).getDatasetLifecycleManager();
+        int maxMetadatasetId = 14;
+        for (int i = 1; i <= maxMetadatasetId; i++) {
+            if (datasetLifecycleManager.getIndex(i, i) != null) {
+                final PrimaryIndexOperationTracker opTracker = 
datasetLifecycleManager.getOperationTracker(i);
+                Assert.assertEquals(0, opTracker.getNumActiveOperations());
+            }
+        }
+    }
+
+    private void addDataset(ICcApplicationContext appCtx, Dataset source, int 
datasetPostfix, boolean abort)
+            throws Exception {
+        Dataset dataset = new Dataset(source.getDataverseName(), "ds_" + 
datasetPostfix, source.getDataverseName(),
+                source.getDatasetType().name(), source.getNodeGroupName(), 
NoMergePolicyFactory.NAME, null,
+                source.getDatasetDetails(), source.getHints(), 
DatasetConfig.DatasetType.INTERNAL, datasetPostfix, 0);
+        MetadataProvider metadataProvider = new MetadataProvider(appCtx, null);
+        final MetadataTransactionContext writeTxn = 
MetadataManager.INSTANCE.beginTransaction();
+        metadataProvider.setMetadataTxnContext(writeTxn);
+        try {
+            MetadataManager.INSTANCE.addDataset(writeTxn, dataset);
+            if (abort) {
+                MetadataManager.INSTANCE.abortTransaction(writeTxn);
+            } else {
+                MetadataManager.INSTANCE.commitTransaction(writeTxn);
+            }
+        } finally {
+            metadataProvider.getLocks().unlock();
+        }
+    }
 }
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
index eb47248..cb83d56 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
@@ -128,8 +128,6 @@
                         KEY_INDICATOR_LIST);
                 IHyracksTaskContext ctx = nc.createTestContext(false);
                 nc.newJobId();
-                ITransactionContext txnCtx =
-                        
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
                 // Prepare insert operation
                 LSMInsertDeleteOperatorNodePushable insertOp = 
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
                         RECORD_TYPE, META_TYPE, null, KEY_INDEXES, 
KEY_INDICATOR_LIST, storageManager).getLeft();
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 6f35a3d..ababe9c 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -174,7 +174,7 @@
         //modificationCallback can be NoOpOperationCallback when redo/undo 
operations are executed.
         if (modificationCallback != NoOpOperationCallback.INSTANCE) {
             numActiveOperations.incrementAndGet();
-            ((AbstractOperationCallback) 
modificationCallback).incrementLocalNumActiveOperations();
+            ((AbstractOperationCallback) 
modificationCallback).beforeOperation();
         }
     }
 
@@ -182,7 +182,7 @@
         //modificationCallback can be NoOpOperationCallback when redo/undo 
operations are executed.
         if (modificationCallback != NoOpOperationCallback.INSTANCE) {
             numActiveOperations.decrementAndGet();
-            ((AbstractOperationCallback) 
modificationCallback).decrementLocalNumActiveOperations();
+            ((AbstractOperationCallback) 
modificationCallback).afterOperation();
         }
     }
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
index 9844344..098bbd4 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallback.java
@@ -31,14 +31,16 @@
     protected final ITransactionContext txnCtx;
     protected final ILockManager lockManager;
     protected final long[] longHashes;
+    protected final long resourceId;
 
-    public AbstractOperationCallback(DatasetId datasetId, int[] 
primaryKeyFields, ITransactionContext txnCtx,
-            ILockManager lockManager) {
+    public AbstractOperationCallback(DatasetId datasetId, long resourceId, 
int[] primaryKeyFields,
+            ITransactionContext txnCtx, ILockManager lockManager) {
         this.datasetId = datasetId;
+        this.resourceId = resourceId;
         this.primaryKeyFields = primaryKeyFields;
         this.txnCtx = txnCtx;
         this.lockManager = lockManager;
-        this.longHashes = new long[2];
+        longHashes = new long[2];
     }
 
     public int computePrimaryKeyHashValue(ITupleReference tuple, int[] 
primaryKeyFields) {
@@ -46,12 +48,11 @@
         return Math.abs((int) longHashes[0]);
     }
 
-    public void incrementLocalNumActiveOperations() {
-        txnCtx.incrementNumActiveOperations();
+    public void beforeOperation() {
+        txnCtx.beforeOperation(resourceId);
     }
 
-    public void decrementLocalNumActiveOperations() {
-        txnCtx.decrementNumActiveOperations();
+    public void afterOperation() {
+        txnCtx.afterOperation(resourceId);
     }
-
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index 3dda5d3..c4a2d03 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -19,43 +19,132 @@
 package org.apache.asterix.common.transactions;
 
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
 
+/**
+ * A typical transaction lifecycle goes through the following steps:
+ * 1. {@link ITransactionContext#register(long, ILSMIndex, 
IModificationOperationCallback, boolean)}
+ * 2. {@link ITransactionContext#beforeOperation(long)}
+ * 3. {@link ITransactionContext#notifyUpdateCommitted(long)}
+ * 4. {@link ITransactionContext#notifyEntityCommitted}
+ * 5. {@link ITransactionContext#afterOperation(long)}
+ * 6. {@link ITransactionContext#complete()}
+ */
 public interface ITransactionContext {
 
-    public void registerIndexAndCallback(long resourceId, ILSMIndex index, 
AbstractOperationCallback callback,
-            boolean isPrimaryIndex);
+    /**
+     * Registers {@link ILSMIndex} in the transaction. Registering an index
+     * must be done before any operation is performed on the index by this
+     * transaction.
+     *
+     * @param resourceId
+     * @param index
+     * @param callback
+     * @param primaryIndex
+     */
+    void register(long resourceId, ILSMIndex index, 
IModificationOperationCallback callback, boolean primaryIndex);
 
-    public TxnId getTxnId();
+    /**
+     * Gets the unique transaction id.
+     *
+     * @return the unique transaction id
+     */
+    TxnId getTxnId();
 
-    public void setTimeout(boolean isTimeout);
+    /**
+     * Sets a flag indicating that the transaction timed out.
+     *
+     * @param isTimeout
+     */
+    void setTimeout(boolean isTimeout);
 
-    public boolean isTimeout();
+    /**
+     * Tests if the transaction was timed out.
+     *
+     * @return true if this transaction timed out. Otherwise false.
+     */
+    boolean isTimeout();
 
-    public void setTxnState(int txnState);
+    /**
+     * Sets the state if this transaction.
+     *
+     * @param txnState
+     */
+    void setTxnState(int txnState);
 
-    public int getTxnState();
+    /**
+     * Gets the current state of this transaction.
+     *
+     * @return the current state of this transaction
+     */
+    int getTxnState();
 
-    public long getFirstLSN();
+    /**
+     * Gets the first log sequence number of this transaction.
+     *
+     * @return the first log sequence number
+     */
+    long getFirstLSN();
 
-    public long getLastLSN();
+    /**
+     * Gets the last log sequence number of this transactions.
+     *
+     * @return the last log sequence number
+     */
+    long getLastLSN();
 
-    public void setLastLSN(long LSN);
+    /**
+     * Sets the last log sequence number of this transactions.
+     *
+     * @param newValue
+     */
+    void setLastLSN(long newValue);
 
-    public boolean isWriteTxn();
+    /**
+     * Tests if this is a write transaction.
+     *
+     * @return true if this is a write transaction, otherwise false.
+     */
+    boolean isWriteTxn();
 
-    public void setWriteTxn(boolean isWriterTxn);
+    /**
+     * Sets a flag indication that this is a write transaction.
+     *
+     * @param isWriterTxn
+     */
+    void setWriteTxn(boolean isWriterTxn);
 
-    public String prettyPrint();
+    /**
+     * Called before an operation is performed on index
+     * with resource id {@code resourceId}.
+     *
+     * @param resourceId
+     */
+    void beforeOperation(long resourceId);
 
-    public void setMetadataTransaction(boolean isMetadataTxn);
+    /**
+     * Called to notify the transaction that an update log belonging
+     * to this transaction on index with {@code resourceId} has been
+     * flushed to disk.
+     *
+     * @param resourceId
+     */
+    void notifyUpdateCommitted(long resourceId);
 
-    public boolean isMetadataTransaction();
+    /**
+     * Called to notify the transaction that an entity commit
+     * log belonging to this transaction has been flushed to
+     * disk.
+     */
+    void notifyEntityCommitted();
 
-    public void notifyOptracker(boolean isJobLevelCommit);
-
-    public void incrementNumActiveOperations();
-
-    public void decrementNumActiveOperations();
+    /**
+     * Called after an operation is performed on index
+     * with resource id {@code resourceId}.
+     *
+     * @param resourceId
+     */
+    void afterOperation(long resourceId);
 
     /**
      * Called when no further operations will be performed by the transaction
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
index 77c6a9f..0ed816b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionManager.java
@@ -24,7 +24,6 @@
  * Provides APIs for managing life cycle of a transaction, that is beginning a
  * transaction and aborting/committing the transaction.
  */
-
 public interface ITransactionManager {
 
     /**
@@ -33,96 +32,69 @@
      * transaction has committed. ABORTED: The transaction has aborted.
      * TIMED_OUT: The transaction has timed out waiting to acquire a lock.
      */
-    public static final int ACTIVE = 0;
-    public static final int COMMITTED = 1;
-    public static final int ABORTED = 2;
-    public static final int TIMED_OUT = 3;
+    int ACTIVE = 0;
+    int COMMITTED = 1;
+    int ABORTED = 2;
+    int TIMED_OUT = 3;
+
+    enum AtomicityLevel {
+        /**
+         * all records are committed or nothing
+         */
+        ATOMIC,
+        /**
+         * any record with entity commit log
+         */
+        ENTITY_LEVEL
+    }
+
+    enum TransactionMode {
+        /**
+         * Transaction performs only read operations
+         */
+        READ,
+        /**
+         * Transaction may perform read and write operations
+         */
+        READ_WRITE
+    }
 
     /**
      * Begins a transaction identified by a transaction id and returns the
      * associated transaction context.
      *
      * @param txnId
-     *            a unique value for the transaction id.
-     * @return the transaction context associated with the initiated 
transaction
-     * @see ITransactionContext
+     * @param options
+     * @return The transaction context
      * @throws ACIDException
      */
-    public ITransactionContext beginTransaction(TxnId txnId) throws 
ACIDException;
+    ITransactionContext beginTransaction(TxnId txnId, TransactionOptions 
options) throws ACIDException;
 
     /**
      * Returns the transaction context of an active transaction given the
      * transaction id.
      *
      * @param txnId
-     *            a unique value for the transaction id.
-     * @param createIfNotExist
-     *            TODO
-     * @return
+     * @return The transaction context
      * @throws ACIDException
      */
-    public ITransactionContext getTransactionContext(TxnId txnId, boolean 
createIfNotExist) throws ACIDException;
+    ITransactionContext getTransactionContext(TxnId txnId) throws 
ACIDException;
 
     /**
-     * Commits a transaction.
+     * Commit a transactions
      *
-     * @param txnContext
-     *            the transaction context associated with the transaction
-     * @param datasetId
-     *            TODO
-     * @param pkHash
-     *            TODO
+     * @param txnId
      * @throws ACIDException
-     * @see ITransactionContextimport org.apache.hyracks.api.job.TxnId;
-     * @see ACIDException
      */
-    public void commitTransaction(ITransactionContext txnContext, DatasetId 
datasetId, int pkHash)
-            throws ACIDException;
+    void commitTransaction(TxnId txnId) throws ACIDException;
 
     /**
      * Aborts a transaction.
      *
-     * @param txnContext
-     *            the transaction context associated with the transaction
-     * @param datasetId
-     *            TODO
-     * @param pkHash
-     *            TODO
-     * @throws ACIDException
-     * @see ITransactionContext
-     * @see ACIDException
-     */
-    public void abortTransaction(ITransactionContext txnContext, DatasetId 
datasetId, int pkHash)
-            throws ACIDException;
-
-    /**
-     * Indicates end of all activity for a transaction. In other words, all
-     * participating threads in the transaction have completed the intended
-     * task.
-     *
-     * @param txnContext
-     *            the transaction context associated with the transaction
-     * @param datasetId
-     *            TODO
-     * @param pkHash
-     *            TODO
-     * @param success
-     *            indicates the success or failure. The transaction is 
committed
-     *            or aborted accordingly.
+     * @param txnId
      * @throws ACIDException
      */
-    public void completedTransaction(ITransactionContext txnContext, DatasetId 
datasetId, int pkHash,
-            boolean success) throws ACIDException;
-
-    /**
-     * Returns the Transaction Provider for the transaction eco-system. A
-     * transaction eco-system consists of a Log Manager, a Recovery Manager, a
-     * Transaction Manager and a Lock Manager.
-     *
-     * @see ITransactionSubsystem
-     * @return TransactionProvider
-     */
-    public ITransactionSubsystem getTransactionSubsystem();
+    void abortTransaction(TxnId txnId) throws ACIDException;
 
     /**
      * @return The current max txn id.
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TransactionOptions.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TransactionOptions.java
new file mode 100644
index 0000000..48dc452
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/TransactionOptions.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import static 
org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
+
+public class TransactionOptions {
+
+    private final AtomicityLevel atomicityLevel;
+
+    // TODO add TransactionMode(READ/WRITE) to options
+    public TransactionOptions(AtomicityLevel atomicityLevel) {
+        this.atomicityLevel = atomicityLevel;
+    }
+
+    public AtomicityLevel getAtomicityLevel() {
+        return atomicityLevel;
+    }
+}
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 87a5272..fae45e0 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -19,13 +19,14 @@
 
 package org.apache.asterix.metadata;
 
+import static 
org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
+
 import java.rmi.RemoteException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
@@ -36,12 +37,12 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
-import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.ImmutableDatasetId;
+import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -99,7 +100,6 @@
 import 
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback;
 import 
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallback;
 import 
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
-import 
org.apache.asterix.transaction.management.service.transaction.TransactionContext;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
@@ -117,7 +117,6 @@
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
 import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
@@ -165,37 +164,29 @@
 
     @Override
     public void beginTransaction(TxnId transactionId) throws ACIDException, 
RemoteException {
-        ITransactionContext txnCtx = 
transactionSubsystem.getTransactionManager().beginTransaction(transactionId);
-        txnCtx.setMetadataTransaction(true);
+        TransactionOptions options = new 
TransactionOptions(AtomicityLevel.ATOMIC);
+        
transactionSubsystem.getTransactionManager().beginTransaction(transactionId, 
options);
     }
 
     @Override
     public void commitTransaction(TxnId txnId) throws RemoteException, 
ACIDException {
-        ITransactionContext txnCtx = 
transactionSubsystem.getTransactionManager().getTransactionContext(txnId, 
false);
-        transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, 
DatasetId.NULL, -1);
+        transactionSubsystem.getTransactionManager().commitTransaction(txnId);
     }
 
     @Override
     public void abortTransaction(TxnId txnId) throws RemoteException, 
ACIDException {
-        try {
-            ITransactionContext txnCtx =
-                    
transactionSubsystem.getTransactionManager().getTransactionContext(txnId, 
false);
-            
transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, 
DatasetId.NULL, -1);
-        } catch (ACIDException e) {
-            LOGGER.log(Level.WARNING, "Exception aborting transaction", e);
-            throw e;
-        }
+        transactionSubsystem.getTransactionManager().abortTransaction(txnId);
     }
 
     @Override
     public void lock(TxnId txnId, byte lockMode) throws ACIDException, 
RemoteException {
-        ITransactionContext txnCtx = 
transactionSubsystem.getTransactionManager().getTransactionContext(txnId, 
false);
+        ITransactionContext txnCtx = 
transactionSubsystem.getTransactionManager().getTransactionContext(txnId);
         transactionSubsystem.getLockManager().lock(METADATA_DATASET_ID, -1, 
lockMode, txnCtx);
     }
 
     @Override
     public void unlock(TxnId txnId, byte lockMode) throws ACIDException, 
RemoteException {
-        ITransactionContext txnCtx = 
transactionSubsystem.getTransactionManager().getTransactionContext(txnId, 
false);
+        ITransactionContext txnCtx = 
transactionSubsystem.getTransactionManager().getTransactionContext(txnId);
         transactionSubsystem.getLockManager().unlock(METADATA_DATASET_ID, -1, 
lockMode, txnCtx);
     }
 
@@ -472,96 +463,66 @@
 
     private void insertTupleIntoIndex(TxnId txnId, IMetadataIndex 
metadataIndex, ITupleReference tuple)
             throws ACIDException, HyracksDataException {
-        long resourceID = metadataIndex.getResourceId();
-        String resourceName = metadataIndex.getFile().getRelativePath();
-        ILSMIndex lsmIndex = (ILSMIndex) 
datasetLifecycleManager.get(resourceName);
-        try {
-            datasetLifecycleManager.open(resourceName);
-
-            // prepare a Callback for logging
-            IModificationOperationCallback modCallback =
-                    createIndexModificationCallback(txnId, resourceID, 
metadataIndex, lsmIndex, Operation.INSERT);
-
-            IIndexAccessParameters iap = new 
IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
-            ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap);
-
-            ITransactionContext txnCtx =
-                    
transactionSubsystem.getTransactionManager().getTransactionContext(txnId, 
false);
-            txnCtx.setWriteTxn(true);
-            txnCtx.registerIndexAndCallback(resourceID, lsmIndex, 
(AbstractOperationCallback) modCallback,
-                    metadataIndex.isPrimaryIndex());
-
-            LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, 
transactionSubsystem.getLogManager());
-
-            // TODO: fix exceptions once new BTree exception model is in 
hyracks.
-            indexAccessor.forceInsert(tuple);
-            // Manually complete the operation after the insert. This is to 
decrement the
-            // resource counters within the
-            // index that determine how many tuples are still 'in-flight' 
within the index.
-            // Normally the log flusher
-            // does this. The only exception is the index registered as the 
"primary" which
-            // we will let be decremented
-            // by the job commit log event
-            if (!((TransactionContext) 
txnCtx).getPrimaryIndexOpTracker().equals(lsmIndex.getOperationTracker())) {
-                lsmIndex.getOperationTracker().completeOperation(lsmIndex, 
LSMOperationType.FORCE_MODIFICATION, null,
-                        modCallback);
-            }
-        } finally {
-            datasetLifecycleManager.close(resourceName);
-        }
+        modifyMetadataIndex(Operation.INSERT, txnId, metadataIndex, tuple);
     }
 
     private void upsertTupleIntoIndex(TxnId txnId, IMetadataIndex 
metadataIndex, ITupleReference tuple)
             throws ACIDException, HyracksDataException {
-        long resourceId = metadataIndex.getResourceId();
+        modifyMetadataIndex(Operation.UPSERT, txnId, metadataIndex, tuple);
+    }
+
+    private void modifyMetadataIndex(Operation op, TxnId txnId, IMetadataIndex 
metadataIndex, ITupleReference tuple)
+            throws ACIDException, HyracksDataException {
         String resourceName = metadataIndex.getFile().getRelativePath();
         ILSMIndex lsmIndex = (ILSMIndex) 
datasetLifecycleManager.get(resourceName);
         datasetLifecycleManager.open(resourceName);
         try {
-            // prepare a Callback for logging
-            ITransactionContext txnCtx =
-                    
transactionSubsystem.getTransactionManager().getTransactionContext(txnId, 
false);
-            IModificationOperationCallback modCallback =
-                    new UpsertOperationCallback(metadataIndex.getDatasetId(), 
metadataIndex.getPrimaryKeyIndexes(),
-                            txnCtx, transactionSubsystem.getLockManager(), 
transactionSubsystem, resourceId,
-                            metadataStoragePartition, ResourceType.LSM_BTREE, 
Operation.UPSERT);
+            ITransactionContext txnCtx = 
transactionSubsystem.getTransactionManager().getTransactionContext(txnId);
+            IModificationOperationCallback modCallback = 
createIndexModificationCallback(op, txnCtx, metadataIndex);
             IIndexAccessParameters iap = new 
IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
             ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap);
             txnCtx.setWriteTxn(true);
-            txnCtx.registerIndexAndCallback(resourceId, lsmIndex, 
(AbstractOperationCallback) modCallback,
-                    metadataIndex.isPrimaryIndex());
+            txnCtx.register(metadataIndex.getResourceId(), lsmIndex, 
modCallback, metadataIndex.isPrimaryIndex());
             LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, 
transactionSubsystem.getLogManager());
-            indexAccessor.forceUpsert(tuple);
-            // Manually complete the operation after the insert. This is to 
decrement the
-            // resource counters within the
-            // index that determine how many tuples are still 'in-flight' 
within the index.
-            // Normally the log flusher
-            // does this. The only exception is the index registered as the 
"primary" which
-            // we will let be decremented
-            // by the job commit log event
-            if (!((TransactionContext) 
txnCtx).getPrimaryIndexOpTracker().equals(lsmIndex.getOperationTracker())) {
-                lsmIndex.getOperationTracker().completeOperation(lsmIndex, 
LSMOperationType.FORCE_MODIFICATION, null,
-                        modCallback);
+            switch (op) {
+                case INSERT:
+                    indexAccessor.insert(tuple);
+                    break;
+                case DELETE:
+                    indexAccessor.delete(tuple);
+                    break;
+                case UPSERT:
+                    indexAccessor.upsert(tuple);
+                    break;
+                default:
+                    throw new IllegalStateException("Unknown operation type: " 
+ op);
             }
         } finally {
             datasetLifecycleManager.close(resourceName);
         }
     }
 
-    private IModificationOperationCallback 
createIndexModificationCallback(TxnId txnId, long resourceId,
-            IMetadataIndex metadataIndex, ILSMIndex lsmIndex, Operation 
indexOp) throws ACIDException {
-        ITransactionContext txnCtx = 
transactionSubsystem.getTransactionManager().getTransactionContext(txnId, 
false);
-
-        // Regardless of the index type (primary or secondary index), 
secondary index
-        // modification callback is given
-        // This is still correct since metadata index operation doesn't 
require any lock
-        // from ConcurrentLockMgr and
-        // The difference between primaryIndexModCallback and 
secondaryIndexModCallback
-        // is that primary index requires
-        // locks and secondary index doesn't.
-        return new 
SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId(),
-                metadataIndex.getPrimaryKeyIndexes(), txnCtx, 
transactionSubsystem.getLockManager(),
-                transactionSubsystem, resourceId, metadataStoragePartition, 
ResourceType.LSM_BTREE, indexOp);
+    private IModificationOperationCallback 
createIndexModificationCallback(Operation indexOp,
+            ITransactionContext txnCtx, IMetadataIndex metadataIndex) {
+        switch (indexOp) {
+            case INSERT:
+            case DELETE:
+                /*
+                 * Regardless of the index type (primary or secondary index), 
secondary index modification
+                 * callback is given. This is still correct since metadata 
index operation doesn't require
+                 * any lock from ConcurrentLockMgr.
+                 */
+                return new 
SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId(),
+                        metadataIndex.getPrimaryKeyIndexes(), txnCtx, 
transactionSubsystem.getLockManager(),
+                        transactionSubsystem, metadataIndex.getResourceId(), 
metadataStoragePartition,
+                        ResourceType.LSM_BTREE, indexOp);
+            case UPSERT:
+                return new 
UpsertOperationCallback(metadataIndex.getDatasetId(), 
metadataIndex.getPrimaryKeyIndexes(),
+                        txnCtx, transactionSubsystem.getLockManager(), 
transactionSubsystem,
+                        metadataIndex.getResourceId(), 
metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
+            default:
+                throw new IllegalStateException("Unknown operation type: " + 
indexOp);
+        }
     }
 
     @Override
@@ -822,40 +783,7 @@
 
     private void deleteTupleFromIndex(TxnId txnId, IMetadataIndex 
metadataIndex, ITupleReference tuple)
             throws ACIDException, HyracksDataException {
-        long resourceID = metadataIndex.getResourceId();
-        String resourceName = metadataIndex.getFile().getRelativePath();
-        ILSMIndex lsmIndex = (ILSMIndex) 
datasetLifecycleManager.get(resourceName);
-        try {
-            datasetLifecycleManager.open(resourceName);
-            // prepare a Callback for logging
-            IModificationOperationCallback modCallback =
-                    createIndexModificationCallback(txnId, resourceID, 
metadataIndex, lsmIndex, Operation.DELETE);
-            IIndexAccessParameters iap = new 
IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
-            ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap);
-
-            ITransactionContext txnCtx =
-                    
transactionSubsystem.getTransactionManager().getTransactionContext(txnId, 
false);
-            txnCtx.setWriteTxn(true);
-            txnCtx.registerIndexAndCallback(resourceID, lsmIndex, 
(AbstractOperationCallback) modCallback,
-                    metadataIndex.isPrimaryIndex());
-
-            LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, 
transactionSubsystem.getLogManager());
-
-            indexAccessor.forceDelete(tuple);
-            // Manually complete the operation after the insert. This is to 
decrement the
-            // resource counters within the
-            // index that determine how many tuples are still 'in-flight' 
within the index.
-            // Normally the log flusher
-            // does this. The only exception is the index registered as the 
"primary" which
-            // we will let be decremented
-            // by the job commit log event
-            if (!((TransactionContext) 
txnCtx).getPrimaryIndexOpTracker().equals(lsmIndex.getOperationTracker())) {
-                lsmIndex.getOperationTracker().completeOperation(lsmIndex, 
LSMOperationType.FORCE_MODIFICATION, null,
-                        modCallback);
-            }
-        } finally {
-            datasetLifecycleManager.close(resourceName);
-        }
+        modifyMetadataIndex(Operation.DELETE, txnId, metadataIndex, tuple);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index 23ad1e1..d3c3fe7 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -18,12 +18,14 @@
  */
 package org.apache.asterix.runtime.job.listener;
 
+import static 
org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
+
 import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.job.IJobletEventListener;
@@ -79,10 +81,13 @@
                 try {
                     ITransactionManager txnManager = ((INcApplicationContext) 
jobletContext.getServiceContext()
                             
.getApplicationContext()).getTransactionSubsystem().getTransactionManager();
-                    ITransactionContext txnContext = 
txnManager.getTransactionContext(txnId, false);
+                    ITransactionContext txnContext = 
txnManager.getTransactionContext(txnId);
                     txnContext.setWriteTxn(transactionalWrite);
-                    txnManager.completedTransaction(txnContext, 
DatasetId.NULL, -1,
-                            !(jobStatus == JobStatus.FAILURE));
+                    if (jobStatus != JobStatus.FAILURE) {
+                        txnManager.commitTransaction(txnId);
+                    } else {
+                        txnManager.abortTransaction(txnId);
+                    }
                 } catch (ACIDException e) {
                     throw new Error(e);
                 }
@@ -91,8 +96,9 @@
             @Override
             public void jobletStart() {
                 try {
+                    TransactionOptions options = new 
TransactionOptions(AtomicityLevel.ENTITY_LEVEL);
                     ((INcApplicationContext) 
jobletContext.getServiceContext().getApplicationContext())
-                            
.getTransactionSubsystem().getTransactionManager().getTransactionContext(txnId, 
true);
+                            
.getTransactionSubsystem().getTransactionManager().beginTransaction(txnId, 
options);
                 } catch (ACIDException e) {
                     throw new Error(e);
                 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
index 23c86f3..bfe1925 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
@@ -23,9 +23,9 @@
 import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TransactionOptions;
 import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.job.IJobletEventListener;
@@ -75,10 +75,13 @@
                             ((INcApplicationContext) 
jobletContext.getServiceContext().getApplicationContext())
                                     
.getTransactionSubsystem().getTransactionManager();
                     for (TxnId txnId : txnIds) {
-                        ITransactionContext txnContext = 
txnManager.getTransactionContext(txnId, false);
+                        ITransactionContext txnContext = 
txnManager.getTransactionContext(txnId);
                         txnContext.setWriteTxn(transactionalWrite);
-                        txnManager.completedTransaction(txnContext, 
DatasetId.NULL, -1,
-                                !(jobStatus == JobStatus.FAILURE));
+                        if (jobStatus != JobStatus.FAILURE) {
+                            txnManager.commitTransaction(txnId);
+                        } else {
+                            txnManager.abortTransaction(txnId);
+                        }
                     }
                 } catch (ACIDException e) {
                     throw new Error(e);
@@ -88,9 +91,11 @@
             @Override
             public void jobletStart() {
                 try {
+                    TransactionOptions options =
+                            new 
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL);
                     for (TxnId txnId : txnIds) {
                         ((INcApplicationContext) 
jobletContext.getServiceContext().getApplicationContext())
-                                
.getTransactionSubsystem().getTransactionManager().getTransactionContext(txnId, 
true);
+                                
.getTransactionSubsystem().getTransactionManager().beginTransaction(txnId, 
options);
                     }
                 } catch (ACIDException e) {
                     throw new Error(e);
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index 6f7287b..d61e9a0 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -78,7 +78,7 @@
                     ILockManager lockManager = 
appCtx.getTransactionSubsystem().getLockManager();
                     ITransactionManager txnManager = 
appCtx.getTransactionSubsystem().getTransactionManager();
                     // get the local transaction
-                    ITransactionContext txnCtx = 
txnManager.getTransactionContext(txnId, false);
+                    ITransactionContext txnCtx = 
txnManager.getTransactionContext(txnId);
                     // lock the dataset granule
                     lockManager.lock(datasetId, -1, LockMode.S, txnCtx);
                     // flush the dataset synchronously
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index 3a2f195..4bbcb58 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -77,7 +77,7 @@
     protected AbstractIndexModificationOperationCallback(DatasetId datasetId, 
int[] primaryKeyFields,
             ITransactionContext txnCtx, ILockManager lockManager, 
ITransactionSubsystem txnSubsystem, long resourceId,
             int resourcePartition, byte resourceType, Operation indexOp) {
-        super(datasetId, primaryKeyFields, txnCtx, lockManager);
+        super(datasetId, resourceId, primaryKeyFields, txnCtx, lockManager);
         this.resourceId = resourceId;
         this.resourceType = resourceType;
         this.indexOp = indexOp;
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
index c97fb1b..fe17b39 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -45,10 +45,10 @@
     private final ILogRecord logRecord;
     private int pkHash;
 
-    public LockThenSearchOperationCallback(DatasetId datasetId, int[] 
entityIdFields,
+    public LockThenSearchOperationCallback(DatasetId datasetId, long 
resourceId, int[] entityIdFields,
             ITransactionSubsystem txnSubsystem, ITransactionContext txnCtx,
             IOperatorNodePushable operatorNodePushable) {
-        super(datasetId, entityIdFields, txnCtx, 
txnSubsystem.getLockManager());
+        super(datasetId, resourceId, entityIdFields, txnCtx, 
txnSubsystem.getLockManager());
         this.operatorNodePushable = 
(LSMIndexInsertUpdateDeleteOperatorNodePushable) operatorNodePushable;
         this.logManager = txnSubsystem.getLogManager();
         this.logRecord = new LogRecord();
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
index 3f3dbd9..9f96263 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
@@ -49,9 +49,9 @@
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
-            return new LockThenSearchOperationCallback(new 
DatasetId(datasetId), primaryKeyFields, txnSubsystem, txnCtx,
-                    operatorNodePushable);
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
+            return new LockThenSearchOperationCallback(new 
DatasetId(datasetId), resourceId, primaryKeyFields,
+                    txnSubsystem, txnCtx, operatorNodePushable);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
index b13a08e..ec776a5 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallback.java
@@ -35,9 +35,9 @@
 public class PrimaryIndexInstantSearchOperationCallback extends 
AbstractOperationCallback
         implements ISearchOperationCallback {
 
-    public PrimaryIndexInstantSearchOperationCallback(DatasetId datasetId, 
int[] entityIdFields,
+    public PrimaryIndexInstantSearchOperationCallback(DatasetId datasetId, 
long resourceId, int[] entityIdFields,
             ILockManager lockManager, ITransactionContext txnCtx) {
-        super(datasetId, entityIdFields, txnCtx, lockManager);
+        super(datasetId, resourceId, entityIdFields, txnCtx, lockManager);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index 93108f9..f9c8e3c 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -34,8 +34,8 @@
 import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.common.ISearchOperationCallback;
 
-public class PrimaryIndexInstantSearchOperationCallbackFactory extends 
AbstractOperationCallbackFactory implements
-        ISearchOperationCallbackFactory {
+public class PrimaryIndexInstantSearchOperationCallbackFactory extends 
AbstractOperationCallbackFactory
+        implements ISearchOperationCallbackFactory {
 
     private static final long serialVersionUID = 1L;
 
@@ -45,15 +45,15 @@
     }
 
     @Override
-    public ISearchOperationCallback createSearchOperationCallback(long 
resourceId, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
-            throws HyracksDataException {
+    public ISearchOperationCallback createSearchOperationCallback(long 
resourceId, IHyracksTaskContext ctx,
+            IOperatorNodePushable operatorNodePushable) throws 
HyracksDataException {
         ITransactionSubsystem txnSubsystem = 
txnSubsystemProvider.getTransactionSubsystem(ctx);
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
-            return new PrimaryIndexInstantSearchOperationCallback(new 
DatasetId(datasetId), primaryKeyFields,
-                    txnSubsystem.getLockManager(), txnCtx);
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
+            return new PrimaryIndexInstantSearchOperationCallback(new 
DatasetId(datasetId), resourceId,
+                    primaryKeyFields, txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
         }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index fb01952..8f5e386 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -23,7 +23,6 @@
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -70,12 +69,12 @@
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
             DatasetLocalResource aResource = (DatasetLocalResource) 
resource.getResource();
             IModificationOperationCallback modCallback = new 
PrimaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, 
txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, 
indexOp, operatorNodePushable);
-            txnCtx.registerIndexAndCallback(resource.getId(), index, 
(AbstractOperationCallback) modCallback, true);
+            txnCtx.register(resource.getId(), index, modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
             throw HyracksDataException.create(e);
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
index a9075d0..961f799 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallback.java
@@ -34,9 +34,9 @@
  */
 public class PrimaryIndexSearchOperationCallback extends 
AbstractOperationCallback implements ISearchOperationCallback {
 
-    public PrimaryIndexSearchOperationCallback(DatasetId datasetId, int[] 
entityIdFields, ILockManager lockManager,
-            ITransactionContext txnCtx) {
-        super(datasetId, entityIdFields, txnCtx, lockManager);
+    public PrimaryIndexSearchOperationCallback(DatasetId datasetId, long 
resourceId, int[] entityIdFields,
+            ILockManager lockManager, ITransactionContext txnCtx) {
+        super(datasetId, resourceId, entityIdFields, txnCtx, lockManager);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index 076b0d9..64cbbc9 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -51,8 +51,8 @@
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
-            return new PrimaryIndexSearchOperationCallback(new 
DatasetId(datasetId), primaryKeyFields,
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
+            return new PrimaryIndexSearchOperationCallback(new 
DatasetId(datasetId), resourceId, primaryKeyFields,
                     txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 5882046..3fc42c9 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -23,7 +23,6 @@
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -66,12 +65,12 @@
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
             DatasetLocalResource aResource = (DatasetLocalResource) 
resource.getResource();
             IModificationOperationCallback modCallback = new 
SecondaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, 
txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, 
indexOp);
-            txnCtx.registerIndexAndCallback(resource.getId(), index, 
(AbstractOperationCallback) modCallback, false);
+            txnCtx.register(resource.getId(), index, modCallback, false);
             return modCallback;
         } catch (ACIDException e) {
             throw HyracksDataException.create(e);
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
index 108a77e..1b87376 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallback.java
@@ -31,8 +31,8 @@
 public class SecondaryIndexSearchOperationCallback extends 
AbstractOperationCallback
         implements ISearchOperationCallback {
 
-    public SecondaryIndexSearchOperationCallback() {
-        super(DatasetId.NULL, null, null, null);
+    public SecondaryIndexSearchOperationCallback(long resourceId) {
+        super(DatasetId.NULL, resourceId, null, null, null);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
index 0b96164..972668a 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
@@ -32,6 +32,6 @@
     @Override
     public ISearchOperationCallback createSearchOperationCallback(long 
resourceId, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
             throws HyracksDataException {
-        return new SecondaryIndexSearchOperationCallback();
+        return new SecondaryIndexSearchOperationCallback(resourceId);
     }
 }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index 79ce788..735d7ea 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -23,7 +23,6 @@
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -67,12 +66,12 @@
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
             DatasetLocalResource aResource = (DatasetLocalResource) 
resource.getResource();
             IModificationOperationCallback modCallback = new 
TempDatasetIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, 
txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, 
indexOp);
-            txnCtx.registerIndexAndCallback(resource.getId(), index, 
(AbstractOperationCallback) modCallback, true);
+            txnCtx.register(resource.getId(), index, modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 8a27914..b744606 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -23,7 +23,6 @@
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -69,11 +68,11 @@
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
             IModificationOperationCallback modCallback = new 
TempDatasetIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, 
txnSubsystem.getLockManager(), txnSubsystem,
                     resource.getId(), aResource.getPartition(), resourceType, 
indexOp);
-            txnCtx.registerIndexAndCallback(resource.getId(), index, 
(AbstractOperationCallback) modCallback, false);
+            txnCtx.register(resource.getId(), index, modCallback, false);
             return modCallback;
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index dfd3eb1..da4aab8 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -22,7 +22,6 @@
 import org.apache.asterix.common.context.ITransactionSubsystemProvider;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.AbstractOperationCallback;
 import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
@@ -66,11 +65,11 @@
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId), false);
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
             IModificationOperationCallback modCallback = new 
UpsertOperationCallback(new DatasetId(datasetId),
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), 
txnSubsystem, resource.getId(),
                     aResource.getPartition(), resourceType, indexOp);
-            txnCtx.registerIndexAndCallback(resource.getId(), index, 
(AbstractOperationCallback) modCallback, true);
+            txnCtx.register(resource.getId(), index, modCallback, true);
             return modCallback;
         } catch (ACIDException e) {
             throw new HyracksDataException(e);
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index fe758e1..672e881 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -83,7 +83,7 @@
     @Override
     public void open() throws HyracksDataException {
         try {
-            transactionContext = 
transactionManager.getTransactionContext(txnId, false);
+            transactionContext = 
transactionManager.getTransactionContext(txnId);
             transactionContext.setWriteTxn(isWriteTransaction);
             ILogMarkerCallback callback = 
TaskUtil.get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx);
             logRecord = new LogRecord(callback);
@@ -111,9 +111,7 @@
                  * active operation count of PrimaryIndexOptracker. By 
maintaining the count correctly and only allowing
                  * flushing when the count is 0, it can guarantee the no-steal 
policy for temporary datasets, too.
                  */
-                // TODO: Fix this for upserts. an upsert tuple right now 
expect to notify the opTracker twice (one for
-                // delete and one for insert)
-                transactionContext.notifyOptracker(false);
+                transactionContext.notifyEntityCommitted();
             } else {
                 tRef.reset(tAccess, t);
                 try {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 3d78ad9..6ebf52c 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -234,27 +234,26 @@
     private void batchUnlock(int beginOffset, int endOffset) throws 
ACIDException {
         if (endOffset > beginOffset) {
             logBufferTailReader.initializeScan(beginOffset, endOffset);
-
             ITransactionContext txnCtx;
-
             LogRecord logRecord = logBufferTailReader.next();
             while (logRecord != null) {
                 if (logRecord.getLogSource() == LogSource.LOCAL) {
                     if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
                         reusableTxnId.setId(logRecord.getTxnId());
                         reusableDatasetId.setId(logRecord.getDatasetId());
-                        txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId, 
false);
+                        txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId);
                         
txnSubsystem.getLockManager().unlock(reusableDatasetId, 
logRecord.getPKHashValue(),
                                 LockMode.ANY, txnCtx);
-                        txnCtx.notifyOptracker(false);
+                        txnCtx.notifyEntityCommitted();
                         if 
(txnSubsystem.getTransactionProperties().isCommitProfilerEnabled()) {
                             txnSubsystem.incrementEntityCommitCount();
                         }
+                    } else if (logRecord.getLogType() == LogType.UPDATE) {
+                        reusableTxnId.setId(logRecord.getTxnId());
+                        txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId);
+                        
txnCtx.notifyUpdateCommitted(logRecord.getResourceId());
                     } else if (logRecord.getLogType() == LogType.JOB_COMMIT
                             || logRecord.getLogType() == LogType.ABORT) {
-                        reusableTxnId.setId(logRecord.getTxnId());
-                        txnCtx = 
txnSubsystem.getTransactionManager().getTransactionContext(reusableTxnId, 
false);
-                        txnCtx.notifyOptracker(true);
                         notifyJobTermination();
                     } else if (logRecord.getLogType() == LogType.FLUSH) {
                         notifyFlushTermination();
@@ -266,7 +265,6 @@
                         notifyReplicationTermination();
                     }
                 }
-
                 logRecord = logBufferTailReader.next();
             }
         }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
new file mode 100644
index 0000000..43fe266
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AbstractTransactionContext.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.asterix.common.context.ITransactionOperationTracker;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public abstract class AbstractTransactionContext implements 
ITransactionContext {
+
+    protected final TxnId txnId;
+    protected final Map<Long, ITransactionOperationTracker> txnOpTrackers;
+    private final AtomicLong firstLSN;
+    private final AtomicLong lastLSN;
+    private final AtomicInteger txnState;
+    private final AtomicBoolean isWriteTxn;
+    private boolean isTimeout = false;
+
+    protected AbstractTransactionContext(TxnId txnId) {
+        this.txnId = txnId;
+        firstLSN = new AtomicLong(-1);
+        lastLSN = new AtomicLong(-1);
+        txnState = new AtomicInteger(ITransactionManager.ACTIVE);
+        isTimeout = false;
+        isWriteTxn = new AtomicBoolean();
+        txnOpTrackers = new HashMap<>();
+    }
+
+    @Override
+    public long getFirstLSN() {
+        return firstLSN.get();
+    }
+
+    @Override
+    public void setLastLSN(long newValue) {
+        firstLSN.compareAndSet(-1, newValue);
+        lastLSN.set(Math.max(lastLSN.get(), newValue));
+    }
+
+    @Override
+    public void setTxnState(int txnState) {
+        this.txnState.set(txnState);
+    }
+
+    @Override
+    public int getTxnState() {
+        return txnState.get();
+    }
+
+    @Override
+    public TxnId getTxnId() {
+        return txnId;
+    }
+
+    @Override
+    public synchronized void setTimeout(boolean isTimeout) {
+        this.isTimeout = isTimeout;
+    }
+
+    @Override
+    public synchronized boolean isTimeout() {
+        return isTimeout;
+    }
+
+    @Override
+    public void setWriteTxn(boolean isWriteTxn) {
+        this.isWriteTxn.set(isWriteTxn);
+    }
+
+    @Override
+    public boolean isWriteTxn() {
+        return isWriteTxn.get();
+    }
+
+    @Override
+    public long getLastLSN() {
+        return lastLSN.get();
+    }
+
+    @Override
+    public void complete() {
+        try {
+            if (txnState.get() == ITransactionManager.ABORTED) {
+                cleanupForAbort();
+            }
+        } finally {
+            synchronized (txnOpTrackers) {
+                txnOpTrackers.forEach((resource, opTracker) -> 
opTracker.afterTransaction(resource));
+            }
+        }
+    }
+
+    @Override
+    public void register(long resourceId, ILSMIndex index, 
IModificationOperationCallback callback,
+            boolean primaryIndex) {
+        synchronized (txnOpTrackers) {
+            if (!txnOpTrackers.containsKey(resourceId)) {
+                final ITransactionOperationTracker txnOpTracker =
+                        (ITransactionOperationTracker) 
index.getOperationTracker();
+                txnOpTrackers.put(resourceId, txnOpTracker);
+                txnOpTracker.beforeTransaction(resourceId);
+            }
+        }
+    }
+
+    public String prettyPrint() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n" + txnId + "\n");
+        sb.append("isWriteTxn: " + isWriteTxn + "\n");
+        sb.append("firstLSN: " + firstLSN.get() + "\n");
+        sb.append("lastLSN: " + lastLSN.get() + "\n");
+        sb.append("TransactionState: " + txnState + "\n");
+        sb.append("isTimeout: " + isTimeout + "\n");
+        return sb.toString();
+    }
+
+    protected abstract void cleanupForAbort();
+}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
new file mode 100644
index 0000000..2cfc439
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AtomicTransactionContext.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class AtomicTransactionContext extends AbstractTransactionContext {
+
+    private final Map<Long, ILSMOperationTracker> opTrackers = new HashMap<>();
+    private final Map<Long, AtomicInteger> indexPendingOps = new HashMap<>();
+    private final Map<Long, IModificationOperationCallback> callbacks = new 
HashMap<>();
+
+    public AtomicTransactionContext(TxnId txnId) {
+        super(txnId);
+    }
+
+    @Override
+    public void register(long resourceId, ILSMIndex index, 
IModificationOperationCallback callback,
+            boolean primaryIndex) {
+        super.register(resourceId, index, callback, primaryIndex);
+        synchronized (txnOpTrackers) {
+            if (primaryIndex && !opTrackers.containsKey(resourceId)) {
+                opTrackers.put(resourceId, index.getOperationTracker());
+                callbacks.put(resourceId, callback);
+                indexPendingOps.put(resourceId, new AtomicInteger(0));
+            }
+        }
+    }
+
+    @Override
+    public void notifyUpdateCommitted(long resourceId) {
+        try {
+            opTrackers.get(resourceId).completeOperation(null, 
LSMOperationType.MODIFICATION, null,
+                    callbacks.get(resourceId));
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public void notifyEntityCommitted() {
+        throw new IllegalStateException("Unexpected entity commit in atomic 
transaction");
+    }
+
+    @Override
+    public void beforeOperation(long resourceId) {
+        indexPendingOps.get(resourceId).incrementAndGet();
+    }
+
+    @Override
+    public void afterOperation(long resourceId) {
+        indexPendingOps.get(resourceId).decrementAndGet();
+    }
+
+    @Override
+    public void cleanupForAbort() {
+        // each opTracker should be cleaned
+        opTrackers.forEach((resId, opTracker) -> 
((PrimaryIndexOperationTracker) opTracker)
+                
.cleanupNumActiveOperationsForAbortedJob(indexPendingOps.get(resId).get()));
+    }
+
+    @Override
+    public int hashCode() {
+        return Long.hashCode(txnId.getId());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        AtomicTransactionContext that = (AtomicTransactionContext) o;
+        return this.txnId.equals(that.txnId);
+    }
+}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
new file mode 100644
index 0000000..12bf5b3
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/EntityLevelTransactionContext.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class EntityLevelTransactionContext extends AbstractTransactionContext {
+
+    private PrimaryIndexOperationTracker primaryIndexOpTracker;
+    private IModificationOperationCallback primaryIndexCallback;
+    private final AtomicInteger pendingOps;
+
+    public EntityLevelTransactionContext(TxnId txnId) {
+        super(txnId);
+        pendingOps = new AtomicInteger(0);
+    }
+
+    @Override
+    public void register(long resourceId, ILSMIndex index, 
IModificationOperationCallback callback,
+            boolean primaryIndex) {
+        super.register(resourceId, index, callback, primaryIndex);
+        synchronized (txnOpTrackers) {
+            if (primaryIndex && primaryIndexOpTracker == null) {
+                primaryIndexCallback = callback;
+                primaryIndexOpTracker = (PrimaryIndexOperationTracker) 
index.getOperationTracker();
+            }
+        }
+    }
+
+    @Override
+    public void beforeOperation(long resourceId) {
+        pendingOps.incrementAndGet();
+    }
+
+    @Override
+    public void notifyUpdateCommitted(long resourceId) {
+        // no op
+    }
+
+    @Override
+    public void notifyEntityCommitted() {
+        try {
+            primaryIndexOpTracker.completeOperation(null, 
LSMOperationType.MODIFICATION, null, primaryIndexCallback);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Override
+    public void afterOperation(long resourceId) {
+        pendingOps.decrementAndGet();
+    }
+
+    @Override
+    protected void cleanupForAbort() {
+        if (primaryIndexOpTracker != null) {
+            
primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(pendingOps.get());
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Long.hashCode(txnId.getId());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        EntityLevelTransactionContext that = (EntityLevelTransactionContext) o;
+        return this.txnId.equals(that.txnId);
+    }
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
deleted file mode 100644
index c408f1d..0000000
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/FieldsHashValueGenerator.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.asterix.transaction.management.service.transaction;
-
-import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-public class FieldsHashValueGenerator {
-    public static int computeFieldsHashValue(ITupleReference tuple, int[] 
fieldIndexes,
-            IBinaryHashFunction[] fieldHashFunctions) throws 
HyracksDataException {
-        int h = 0;
-        for (int i = 0; i < fieldIndexes.length; i++) {
-            int primaryKeyFieldIdx = fieldIndexes[i];
-            int fh = 
fieldHashFunctions[i].hash(tuple.getFieldData(primaryKeyFieldIdx),
-                    tuple.getFieldStart(primaryKeyFieldIdx), 
tuple.getFieldLength(primaryKeyFieldIdx));
-            h = h * 31 + fh;
-            if (h < 0) {
-                h = h * (-1);
-            }
-        }
-        return h;
-    }
-}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MutableResourceId.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MutableResourceId.java
deleted file mode 100644
index c8e134c..0000000
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/MutableResourceId.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.transaction;
-
-public class MutableResourceId {
-    long id;
-
-    public MutableResourceId(long id) {
-        this.id = id;
-    }
-
-    public void setId(long id) {
-        this.id = id;
-    }
-
-    public long getId() {
-        return id;
-    }
-
-    @Override
-    public int hashCode() {
-        return (int) id;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if ((o == null) || !(o instanceof MutableResourceId)) {
-            return false;
-        }
-        return ((MutableResourceId) o).id == this.id;
-    }
-}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
deleted file mode 100644
index 1681a27..0000000
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.transaction;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.asterix.common.context.ITransactionOperationTracker;
-import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.AbstractOperationCallback;
-import org.apache.asterix.common.transactions.ITransactionContext;
-import org.apache.asterix.common.transactions.ITransactionManager;
-import org.apache.asterix.common.transactions.LogRecord;
-import org.apache.asterix.common.transactions.TxnId;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
-import org.apache.hyracks.storage.common.IModificationOperationCallback;
-
-/*
- * An object of TransactionContext is created and accessed(read/written) by 
multiple threads which work for
- * a single job identified by a txnId. Thus, the member variables in the 
object can be read/written
- * concurrently. Please see each variable declaration to know which one is 
accessed concurrently and
- * which one is not.
- */
-public class TransactionContext implements ITransactionContext {
-
-    private static final long serialVersionUID = -6105616785783310111L;
-
-    // txnId is set once and read concurrently.
-    private final TxnId txnId;
-
-    // There are no concurrent writers on both firstLSN and lastLSN
-    // since both values are updated by serialized log appenders.
-    // But readers and writers can be different threads,
-    // so both LSNs are atomic variables in order to be read and written
-    // atomically.
-    private final AtomicLong firstLSN;
-    private final AtomicLong lastLSN;
-
-    // txnState is read and written concurrently.
-    private final AtomicInteger txnState;
-
-    // isTimeout is read and written under the lockMgr's tableLatch
-    // Thus, no other synchronization is required separately.
-    private boolean isTimeout;
-
-    // isWriteTxn can be set concurrently by multiple threads.
-    private final AtomicBoolean isWriteTxn;
-
-    // isMetadataTxn is accessed by a single thread since the metadata is not
-    // partitioned
-    private boolean isMetadataTxn;
-
-    // indexMap is concurrently accessed by multiple threads,
-    // so those threads are synchronized on indexMap object itself
-    private final Map<Long, ITransactionOperationTracker> indexMap;
-
-    // TODO: fix ComponentLSNs' issues.
-    // primaryIndex, primaryIndexCallback, and primaryIndexOptracker will be
-    // modified accordingly
-    // when the issues of componentLSNs are fixed.
-    private ILSMIndex primaryIndex;
-    private AbstractOperationCallback primaryIndexCallback;
-    private PrimaryIndexOperationTracker primaryIndexOpTracker;
-
-    // The following three variables are used as temporary variables in order 
to
-    // avoid object creations.
-    // Those are used in synchronized methods.
-    private final LogRecord logRecord;
-
-    private final AtomicInteger transactorNumActiveOperations;
-
-    // TODO: implement transactionContext pool in order to avoid object
-    // creations.
-    // also, the pool can throttle the number of concurrent active jobs at 
every
-    // moment.
-    public TransactionContext(TxnId txnId) throws ACIDException {
-        this.txnId = txnId;
-        firstLSN = new AtomicLong(-1);
-        lastLSN = new AtomicLong(-1);
-        txnState = new AtomicInteger(ITransactionManager.ACTIVE);
-        isTimeout = false;
-        isWriteTxn = new AtomicBoolean(false);
-        isMetadataTxn = false;
-        indexMap = new HashMap<>();
-        primaryIndex = null;
-        logRecord = new LogRecord();
-        transactorNumActiveOperations = new AtomicInteger(0);
-    }
-
-    @Override
-    public void registerIndexAndCallback(long resourceId, ILSMIndex index, 
AbstractOperationCallback callback,
-            boolean isPrimaryIndex) {
-        synchronized (indexMap) {
-            if (isPrimaryIndex && primaryIndex == null) {
-                primaryIndex = index;
-                primaryIndexCallback = callback;
-                primaryIndexOpTracker = (PrimaryIndexOperationTracker) 
index.getOperationTracker();
-            }
-            if (!indexMap.containsKey(resourceId)) {
-                final ITransactionOperationTracker txnOpTracker =
-                        (ITransactionOperationTracker) 
index.getOperationTracker();
-                indexMap.put(resourceId, txnOpTracker);
-                txnOpTracker.beforeTransaction(resourceId);
-            }
-        }
-    }
-
-    public PrimaryIndexOperationTracker getPrimaryIndexOpTracker() {
-        synchronized (indexMap) {
-            return primaryIndexOpTracker;
-        }
-    }
-
-    // [Notice]
-    // This method is called sequentially by the LogAppender threads.
-    @Override
-    public void setLastLSN(long LSN) {
-        firstLSN.compareAndSet(-1, LSN);
-        lastLSN.set(Math.max(lastLSN.get(), LSN));
-    }
-
-    @Override
-    public void notifyOptracker(boolean isJobLevelCommit) {
-        try {
-            /**
-             * in case of transaction abort {@link 
TransactionContext#cleanupForAbort()} will
-             * clean the primaryIndexOpTracker state.
-             */
-            if (isJobLevelCommit && isMetadataTxn && txnState.get() != 
ITransactionManager.ABORTED) {
-                primaryIndexOpTracker.exclusiveJobCommitted();
-            } else if (!isJobLevelCommit) {
-                primaryIndexOpTracker.completeOperation(null, 
LSMOperationType.MODIFICATION, null,
-                        (IModificationOperationCallback) primaryIndexCallback);
-            }
-        } catch (HyracksDataException e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-    @Override
-    public void setWriteTxn(boolean isWriteTxn) {
-        this.isWriteTxn.set(isWriteTxn);
-    }
-
-    @Override
-    public boolean isWriteTxn() {
-        return isWriteTxn.get();
-    }
-
-    @Override
-    public long getFirstLSN() {
-        return firstLSN.get();
-    }
-
-    @Override
-    public long getLastLSN() {
-        return lastLSN.get();
-    }
-
-    @Override
-    public TxnId getTxnId() {
-        return txnId;
-    }
-
-    @Override
-    public void setTimeout(boolean isTimeout) {
-        this.isTimeout = isTimeout;
-    }
-
-    @Override
-    public boolean isTimeout() {
-        return isTimeout;
-    }
-
-    @Override
-    public void setTxnState(int txnState) {
-        this.txnState.set(txnState);
-    }
-
-    @Override
-    public int getTxnState() {
-        return txnState.get();
-    }
-
-    @Override
-    public int hashCode() {
-        return Long.hashCode(txnId.getId());
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        return (o == this);
-    }
-
-    @Override
-    public void setMetadataTransaction(boolean isMetadataTxn) {
-        this.isMetadataTxn = isMetadataTxn;
-    }
-
-    @Override
-    public boolean isMetadataTransaction() {
-        return isMetadataTxn;
-    }
-
-    @Override
-    public String prettyPrint() {
-        StringBuilder sb = new StringBuilder();
-        sb.append("\n" + txnId + "\n");
-        sb.append("isWriteTxn: " + isWriteTxn + "\n");
-        sb.append("firstLSN: " + firstLSN.get() + "\n");
-        sb.append("lastLSN: " + lastLSN.get() + "\n");
-        sb.append("TransactionState: " + txnState + "\n");
-        sb.append("isTimeout: " + isTimeout + "\n");
-        return sb.toString();
-    }
-
-    public LogRecord getLogRecord() {
-        return logRecord;
-    }
-
-    @Override
-    public void incrementNumActiveOperations() {
-        transactorNumActiveOperations.incrementAndGet();
-    }
-
-    @Override
-    public void decrementNumActiveOperations() {
-        transactorNumActiveOperations.decrementAndGet();
-    }
-
-    @Override
-    public void complete() {
-        try {
-            if (txnState.get() == ITransactionManager.ABORTED) {
-                cleanupForAbort();
-            }
-        } finally {
-            synchronized (indexMap) {
-                indexMap.forEach((resource, opTracker) -> 
opTracker.afterTransaction(resource));
-            }
-        }
-    }
-
-    private void cleanupForAbort() {
-        if (primaryIndexOpTracker != null) {
-            
primaryIndexOpTracker.cleanupNumActiveOperationsForAbortedJob(transactorNumActiveOperations.get());
-        }
-    }
-}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
new file mode 100644
index 0000000..3da100c
--- /dev/null
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContextFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.transaction;
+
+import static 
org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
+
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.common.transactions.TxnId;
+
+public class TransactionContextFactory {
+
+    public static ITransactionContext create(TxnId txnId, TransactionOptions 
options) {
+        final AtomicityLevel atomicityLevel = options.getAtomicityLevel();
+        switch (atomicityLevel) {
+            case ATOMIC:
+                return new AtomicTransactionContext(txnId);
+            case ENTITY_LEVEL:
+                return new EntityLevelTransactionContext(txnId);
+            default:
+                throw new IllegalStateException("Unknown transaction context 
type: " + atomicityLevel);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index 1799ea1..499c5b7 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.transaction.management.service.transaction;
 
+import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Map;
 import java.util.Set;
@@ -27,124 +28,97 @@
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.TransactionOptions;
+import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.TransactionUtil;
 import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
+import org.apache.hyracks.util.annotations.ThreadSafe;
 
 /**
  * An implementation of the @see ITransactionManager interface that provides
  * implementation of APIs for governing the lifecycle of a transaction.
  */
+@ThreadSafe
 public class TransactionManager implements ITransactionManager, 
ILifeCycleComponent {
 
-    public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = 
Logger.getLogger(TransactionManager.class.getName());
     private final ITransactionSubsystem txnSubsystem;
-    private Map<TxnId, ITransactionContext> transactionContextRepository = new 
ConcurrentHashMap<>();
-    private AtomicLong maxTxnId = new AtomicLong(0);
+    private final Map<TxnId, ITransactionContext> txnCtxRepository = new 
ConcurrentHashMap<>();
+    private final AtomicLong maxTxnId = new AtomicLong(0);
 
     public TransactionManager(ITransactionSubsystem provider) {
         this.txnSubsystem = provider;
     }
 
     @Override
-    public void abortTransaction(ITransactionContext txnCtx, DatasetId 
datasetId, int PKHashVal) throws ACIDException {
-        if (txnCtx.getTxnState() != ITransactionManager.ABORTED) {
-            txnCtx.setTxnState(ITransactionManager.ABORTED);
+    public synchronized ITransactionContext beginTransaction(TxnId txnId, 
TransactionOptions options)
+            throws ACIDException {
+        ITransactionContext txnCtx = txnCtxRepository.get(txnId);
+        if (txnCtx != null) {
+            throw new ACIDException("Transaction with the same (" + txnId + ") 
already exists");
         }
-        try {
-            if (txnCtx.isWriteTxn()) {
-                LogRecord logRecord = ((TransactionContext) 
txnCtx).getLogRecord();
-                TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, 
false);
-                txnSubsystem.getLogManager().log(logRecord);
-                txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
-            }
-        } catch (Exception ae) {
-            String msg = "Could not complete rollback! System is in an 
inconsistent state";
-            if (LOGGER.isLoggable(Level.SEVERE)) {
-                LOGGER.severe(msg);
-            }
-            ae.printStackTrace();
-            throw new ACIDException(msg, ae);
-        } finally {
-            txnCtx.complete();
-            txnSubsystem.getLockManager().releaseLocks(txnCtx);
-            transactionContextRepository.remove(txnCtx.getTxnId());
-        }
+        txnCtx = TransactionContextFactory.create(txnId, options);
+        txnCtxRepository.put(txnId, txnCtx);
+        ensureMaxTxnId(txnId.getId());
+        return txnCtx;
     }
 
     @Override
-    public ITransactionContext beginTransaction(TxnId txnId) throws 
ACIDException {
-        return getTransactionContext(txnId, true);
-    }
-
-    @Override
-    public ITransactionContext getTransactionContext(TxnId txnId, boolean 
createIfNotExist) throws ACIDException {
-        setMaxTxnId(txnId.getId());
-        ITransactionContext txnCtx = transactionContextRepository.get(txnId);
+    public ITransactionContext getTransactionContext(TxnId txnId) throws 
ACIDException {
+        ITransactionContext txnCtx = txnCtxRepository.get(txnId);
         if (txnCtx == null) {
-            if (createIfNotExist) {
-                synchronized (this) {
-                    txnCtx = transactionContextRepository.get(txnId);
-                    if (txnCtx == null) {
-                        txnCtx = new TransactionContext(txnId);
-                        transactionContextRepository.put(txnId, txnCtx);
-                    }
-                }
-            } else {
-                throw new ACIDException("TransactionContext of " + txnId + " 
doesn't exist.");
-            }
+            throw new ACIDException("Transaction " + txnId + " doesn't 
exist.");
         }
         return txnCtx;
     }
 
     @Override
-    public void commitTransaction(ITransactionContext txnCtx, DatasetId 
datasetId, int PKHashVal)
-            throws ACIDException {
-        //Only job-level commits call this method.
+    public void commitTransaction(TxnId txnId) throws ACIDException {
+        final ITransactionContext txnCtx = getTransactionContext(txnId);
         try {
             if (txnCtx.isWriteTxn()) {
-                LogRecord logRecord = ((TransactionContext) 
txnCtx).getLogRecord();
+                LogRecord logRecord = new LogRecord();
                 TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, 
true);
                 txnSubsystem.getLogManager().log(logRecord);
+                txnCtx.setTxnState(ITransactionManager.COMMITTED);
             }
-        } catch (Exception ae) {
+        } catch (Exception e) {
             if (LOGGER.isLoggable(Level.SEVERE)) {
                 LOGGER.severe(" caused exception in commit !" + 
txnCtx.getTxnId());
             }
-            throw ae;
+            throw e;
         } finally {
             txnCtx.complete();
             txnSubsystem.getLockManager().releaseLocks(txnCtx);
-            transactionContextRepository.remove(txnCtx.getTxnId());
-            txnCtx.setTxnState(ITransactionManager.COMMITTED);
+            txnCtxRepository.remove(txnCtx.getTxnId());
         }
     }
 
     @Override
-    public void completedTransaction(ITransactionContext txnContext, DatasetId 
datasetId, int PKHashVal,
-            boolean success) throws ACIDException {
-        if (!success) {
-            abortTransaction(txnContext, datasetId, PKHashVal);
-        } else {
-            commitTransaction(txnContext, datasetId, PKHashVal);
-        }
-    }
-
-    @Override
-    public ITransactionSubsystem getTransactionSubsystem() {
-        return txnSubsystem;
-    }
-
-    public void setMaxTxnId(long txnId) {
-        long maxId = maxTxnId.get();
-        if (txnId > maxId) {
-            maxTxnId.compareAndSet(maxId, txnId);
+    public void abortTransaction(TxnId txnId) throws ACIDException {
+        final ITransactionContext txnCtx = getTransactionContext(txnId);
+        try {
+            if (txnCtx.isWriteTxn()) {
+                LogRecord logRecord = new LogRecord();
+                TransactionUtil.formJobTerminateLogRecord(txnCtx, logRecord, 
false);
+                txnSubsystem.getLogManager().log(logRecord);
+                txnSubsystem.getRecoveryManager().rollbackTransaction(txnCtx);
+                txnCtx.setTxnState(ITransactionManager.ABORTED);
+            }
+        } catch (ACIDException e) {
+            String msg = "Could not complete rollback! System is in an 
inconsistent state";
+            if (LOGGER.isLoggable(Level.SEVERE)) {
+                LOGGER.log(Level.SEVERE, msg, e);
+            }
+            throw new ACIDException(msg, e);
+        } finally {
+            txnCtx.complete();
+            txnSubsystem.getLockManager().releaseLocks(txnCtx);
+            txnCtxRepository.remove(txnCtx.getTxnId());
         }
     }
 
@@ -167,45 +141,41 @@
 
     @Override
     public void dumpState(OutputStream os) {
-        //#. dump TxnContext
         dumpTxnContext(os);
+    }
+
+    private void ensureMaxTxnId(long txnId) {
+        maxTxnId.updateAndGet(current -> Math.max(current, txnId));
     }
 
     private void dumpTxnContext(OutputStream os) {
         TxnId txnId;
         ITransactionContext txnCtx;
         StringBuilder sb = new StringBuilder();
-
         try {
             sb.append("\n>>dump_begin\t>>----- [ConfVars] -----");
-            Set<Map.Entry<TxnId, ITransactionContext>> entrySet = 
transactionContextRepository.entrySet();
-            if (entrySet != null) {
-                for (Map.Entry<TxnId, ITransactionContext> entry : entrySet) {
-                    if (entry != null) {
-                        txnId = entry.getKey();
-                        if (txnId != null) {
-                            sb.append("\n" + txnId);
-                        } else {
-                            sb.append("\nJID:null");
-                        }
+            Set<Map.Entry<TxnId, ITransactionContext>> entrySet = 
txnCtxRepository.entrySet();
+            for (Map.Entry<TxnId, ITransactionContext> entry : entrySet) {
+                if (entry != null) {
+                    txnId = entry.getKey();
+                    if (txnId != null) {
+                        sb.append("\n" + txnId);
+                    } else {
+                        sb.append("\nJID:null");
+                    }
 
-                        txnCtx = entry.getValue();
-                        if (txnCtx != null) {
-                            sb.append(txnCtx.prettyPrint());
-                        } else {
-                            sb.append("\nTxnCtx:null");
-                        }
+                    txnCtx = entry.getValue();
+                    if (txnCtx != null) {
+                        sb.append(((AbstractTransactionContext) 
txnCtx).prettyPrint());
+                    } else {
+                        sb.append("\nTxnCtx:null");
                     }
                 }
             }
-
             sb.append("\n>>dump_end\t>>----- [ConfVars] -----\n");
             os.write(sb.toString().getBytes());
-        } catch (Exception e) {
-            //ignore exception and continue dumping as much as possible.
-            if (IS_DEBUG_MODE) {
-                e.printStackTrace();
-            }
+        } catch (IOException e) {
+            LOGGER.log(Level.WARNING, "exception while dumping state", e);
         }
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2156
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I13db1c15f8afbdaae608ff0a7468fe62bf1daccd
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>

Reply via email to