Steven Jacobs has submitted this change and it was merged.

Change subject: [ASTERIXDB-2167][TX][RT] Remove TxnId from JobSpecification
......................................................................


[ASTERIXDB-2167][TX][RT] Remove TxnId from JobSpecification

- user model changes: no
- storage format changes: no
- interface changes: IJobEventListenerFactory

details:
- Remove the TxnId from the compiled job specification
- This enables one job spec to be used by multiple jobs/transactions
- Runtime operators who need the TxnId will pull it from the EventListener

Change-Id: I9526d50b31aebc3bf971d95ba3edf29c0c1066a7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2154
Sonar-Qube: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Contrib: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: abdullah alamoudi <[email protected]>
---
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
M 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
28 files changed, 106 insertions(+), 160 deletions(-)

Approvals:
  abdullah alamoudi: Looks good to me, approved
  Jenkins: Verified; No violations found; ; Verified

Objections:
  Anon. E. Moose #1000171: Violations found



diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index abd18aa..09092ff 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -21,7 +21,6 @@
 
 import java.util.List;
 
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -43,12 +42,10 @@
 public class CommitPOperator extends AbstractPhysicalOperator {
 
     private final List<LogicalVariable> primaryKeyLogicalVars;
-    private final TxnId txnId;
     private final Dataset dataset;
     private final boolean isSink;
 
-    public CommitPOperator(TxnId txnId, Dataset dataset, List<LogicalVariable> 
primaryKeyLogicalVars, boolean isSink) {
-        this.txnId = txnId;
+    public CommitPOperator(Dataset dataset, List<LogicalVariable> 
primaryKeyLogicalVars, boolean isSink) {
         this.dataset = dataset;
         this.primaryKeyLogicalVars = primaryKeyLogicalVars;
         this.isSink = isSink;
@@ -87,7 +84,7 @@
         int[] primaryKeyFields = 
JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]);
 
         //get dataset splits
-        IPushRuntimeFactory runtime = 
dataset.getCommitRuntimeFactory(metadataProvider, txnId, primaryKeyFields,
+        IPushRuntimeFactory runtime = 
dataset.getCommitRuntimeFactory(metadataProvider, primaryKeyFields,
                 isSink);
         builder.contributeMicroOperator(op, runtime, recDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index c941320..c3cc0ae 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -30,7 +30,6 @@
 import org.apache.asterix.optimizer.rules.am.InvertedIndexAccessMethod;
 import 
org.apache.asterix.optimizer.rules.am.InvertedIndexAccessMethod.SearchModifierType;
 import org.apache.asterix.optimizer.rules.am.InvertedIndexJobGenParams;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -158,7 +157,6 @@
                 jobSpec, outputRecDesc, queryField, dataflowHelperFactory, 
queryTokenizerFactory, searchModifierFactory,
                 retainInput, retainMissing, context.getMissingWriterFactory(),
                 
dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(),
 secondaryIndex,
-                        ((JobEventListenerFactory) 
jobSpec.getJobletEventListenerFactory()).getTxnId(),
                         IndexOperation.SEARCH, null),
                 minFilterFieldIndexes, maxFilterFieldIndexes, 
isFullTextSearchQuery, numPrimaryKeys,
                 propagateIndexFilter);
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
index 61339bf..7dfe161 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -23,9 +23,7 @@
 
 import org.apache.asterix.algebra.operators.CommitOperator;
 import org.apache.asterix.algebra.operators.physical.CommitPOperator;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
-import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -99,14 +97,10 @@
             primaryKeyLogicalVars.add(new 
LogicalVariable(varRefExpr.getVariableReference().getId()));
         }
 
-        //get TxnId(TransactorId)
-        MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
-        TxnId txnId = mp.getTxnId();
-
         //create the logical and physical operator
         CommitOperator commitOperator = new 
CommitOperator(primaryKeyLogicalVars, isSink);
         CommitPOperator commitPOperator =
-                new CommitPOperator(txnId, dataset, primaryKeyLogicalVars, 
isSink);
+                new CommitPOperator(dataset, primaryKeyLogicalVars, isSink);
         commitOperator.setPhysicalOperator(commitPOperator);
 
         //create ExtensionOperator and put the commitOperator in it.
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index e42b5e5..9dddda4 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -279,7 +279,7 @@
         Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorIdMapping = 
new HashMap<>();
         Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations 
= new HashMap<>();
         Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>();
-        List<TxnId> txnIds = new ArrayList<>();
+        Map<Integer, TxnId> txnIdMap = new HashMap<>();
         FeedMetaOperatorDescriptor metaOp;
 
         for (int iter1 = 0; iter1 < jobsList.size(); iter1++) {
@@ -415,11 +415,16 @@
             for (OperatorDescriptorId root : subJob.getRoots()) {
                 
jobSpec.addRoot(jobSpec.getOperatorMap().get(operatorIdMapping.get(root)));
             }
-            txnIds.add(((JobEventListenerFactory) 
subJob.getJobletEventListenerFactory()).getTxnId());
+            int datasetId = metadataProvider
+                    .findDataset(curFeedConnection.getDataverseName(), 
curFeedConnection.getDatasetName())
+                    .getDatasetId();
+            TxnId txnId = ((JobEventListenerFactory) 
subJob.getJobletEventListenerFactory()).getTxnId(datasetId);
+            txnIdMap.put(datasetId, txnId);
         }
 
         // jobEventListenerFactory
-        jobSpec.setJobletEventListenerFactory(new 
MultiTransactionJobletEventListenerFactory(txnIds, true));
+        jobSpec.setJobletEventListenerFactory(
+                new MultiTransactionJobletEventListenerFactory(txnIdMap, 
true));
         // useConnectorSchedulingPolicy
         
jobSpec.setUseConnectorPolicyForScheduling(jobsList.get(0).isUseConnectorPolicyForScheduling());
         // connectorAssignmentPolicy
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
index 42f577f..a04c994 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java
@@ -283,13 +283,13 @@
         IOperatorDescriptor starter = 
DatasetUtil.createDummyKeyProviderOp(spec, source, metadataProvider);
 
         // Creates primary index scan op.
-        IOperatorDescriptor primaryScanOp = 
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source, txnId);
+        IOperatorDescriptor primaryScanOp = 
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source);
 
         // Creates secondary BTree upsert op.
         IOperatorDescriptor upsertOp = createPrimaryIndexUpsertOp(spec, 
metadataProvider, source, target);
 
         // The final commit operator.
-        IOperatorDescriptor commitOp = createUpsertCommitOp(spec, 
metadataProvider, txnId, target);
+        IOperatorDescriptor commitOp = createUpsertCommitOp(spec, 
metadataProvider, target);
 
         // Connects empty-tuple-source and scan.
         spec.connect(new OneToOneConnectorDescriptor(spec), starter, 0, 
primaryScanOp, 0);
@@ -326,11 +326,11 @@
 
     // Creates the commit operator for populating the target dataset.
     private static IOperatorDescriptor createUpsertCommitOp(JobSpecification 
spec, MetadataProvider metadataProvider,
-            TxnId txnId, Dataset target) throws AlgebricksException {
+            Dataset target) throws AlgebricksException {
         int[] primaryKeyFields = getPrimaryKeyPermutationForUpsert(target);
         return new AlgebricksMetaOperatorDescriptor(spec, 1, 0,
                 new IPushRuntimeFactory[] {
-                        target.getCommitRuntimeFactory(metadataProvider, 
txnId, primaryKeyFields, true) },
+                        target.getCommitRuntimeFactory(metadataProvider, 
primaryKeyFields, true) },
                 new RecordDescriptor[] { 
target.getPrimaryRecordDescriptor(metadataProvider) });
     }
 
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 352a5f8..a1c2ee6 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -183,7 +183,7 @@
                     mergePolicy.first, mergePolicy.second, filterFields, 
primaryKeyIndexes, primaryKeyIndicators);
             IndexOperation op = IndexOperation.INSERT;
             IModificationOperationCallbackFactory modOpCallbackFactory =
-                    new 
PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(ctx), 
dataset.getDatasetId(),
+                    new 
PrimaryIndexModificationOperationCallbackFactory(dataset.getDatasetId(),
                             primaryIndexInfo.primaryKeyIndexes, 
TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
                             ResourceType.LSM_BTREE);
             IRecordDescriptorProvider recordDescProvider = 
primaryIndexInfo.getInsertRecordDescriptorProvider();
@@ -614,9 +614,9 @@
         PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, 
keyTypes, recordType, metaType,
                 mergePolicy.first, mergePolicy.second, filterFields, 
keyIndexes, keyIndicators);
         IModificationOperationCallbackFactory modificationCallbackFactory = 
dataset.getModificationCallbackFactory(
-                storageComponentProvider, primaryIndexInfo.index, 
getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+                storageComponentProvider, primaryIndexInfo.index, 
IndexOperation.UPSERT, keyIndexes);
         ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
-                storageComponentProvider, primaryIndexInfo.index, 
getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+                storageComponentProvider, primaryIndexInfo.index, 
IndexOperation.UPSERT, keyIndexes);
         IRecordDescriptorProvider recordDescProvider = 
primaryIndexInfo.getInsertRecordDescriptorProvider();
         IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
                 storageComponentProvider.getStorageManager(), 
primaryIndexInfo.getFileSplitProvider());
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
index 0f37b13..acb3ae8 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java
@@ -22,9 +22,9 @@
 import org.apache.hyracks.api.job.IJobletEventListenerFactory;
 
 /**
- * an interface for JobEventListenerFactories to add Asterix transaction JobId 
getter
+ * an interface for JobEventListenerFactories to add Asterix txnId getter
  */
 public interface IJobEventListenerFactory extends IJobletEventListenerFactory {
 
-    TxnId getTxnId(TxnId compiledTxnId);
+    TxnId getTxnId(int datasetId);
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
index d2b1276..ce6671b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
@@ -26,15 +26,13 @@
 public abstract class AbstractOperationCallbackFactory implements Serializable 
{
     private static final long serialVersionUID = 1L;
 
-    protected final TxnId txnId;
     protected final int datasetId;
     protected final int[] primaryKeyFields;
     protected final ITransactionSubsystemProvider txnSubsystemProvider;
     protected final byte resourceType;
 
-    public AbstractOperationCallbackFactory(TxnId txnId, int datasetId, int[] 
primaryKeyFields,
+    public AbstractOperationCallbackFactory(int datasetId, int[] 
primaryKeyFields,
             ITransactionSubsystemProvider txnSubsystemProvider, byte 
resourceType) {
-        this.txnId = txnId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.txnSubsystemProvider = txnSubsystemProvider;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 305cdfa..1e0d597 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -84,7 +84,6 @@
 import org.apache.asterix.om.utils.NonTaggedFormatUtil;
 import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
 import org.apache.asterix.runtime.formats.FormatUtils;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
 import 
org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
 import 
org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
@@ -446,7 +445,7 @@
         }
 
         ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
-                storageComponentProvider, theIndex, txnId, 
IndexOperation.SEARCH, primaryKeyFields);
+                storageComponentProvider, theIndex, IndexOperation.SEARCH, 
primaryKeyFields);
         IStorageManager storageManager = 
getStorageComponentProvider().getStorageManager();
         IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(storageManager, spPc.first);
         BTreeSearchOperatorDescriptor btreeSearchOp;
@@ -485,7 +484,7 @@
         }
 
         ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
-                storageComponentProvider, secondaryIndex, txnId, 
IndexOperation.SEARCH, primaryKeyFields);
+                storageComponentProvider, secondaryIndex, 
IndexOperation.SEARCH, primaryKeyFields);
         RTreeSearchOperatorDescriptor rtreeSearchOp;
         IIndexDataflowHelperFactory indexDataflowHelperFactory =
                 new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
spPc.first);
@@ -789,7 +788,7 @@
             // files index
             RecordDescriptor outRecDesc = 
JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
             ISearchOperationCallbackFactory searchOpCallbackFactory = dataset
-                    .getSearchCallbackFactory(storageComponentProvider, 
fileIndex, txnId, IndexOperation.SEARCH, null);
+                    .getSearchCallbackFactory(storageComponentProvider, 
fileIndex, IndexOperation.SEARCH, null);
             // Create the operator
             ExternalLookupOperatorDescriptor op = new 
ExternalLookupOperatorDescriptor(jobSpec, adapterFactory,
                     outRecDesc, indexDataflowHelperFactory, 
searchOpCallbackFactory,
@@ -959,7 +958,7 @@
             primaryKeyFields[i] = i;
         }
         IModificationOperationCallbackFactory modificationCallbackFactory = 
dataset.getModificationCallbackFactory(
-                storageComponentProvider, primaryIndex, txnId, indexOp, 
primaryKeyFields);
+                storageComponentProvider, primaryIndex, indexOp, 
primaryKeyFields);
         IIndexDataflowHelperFactory idfh =
                 new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
         IOperatorDescriptor op;
@@ -1081,9 +1080,8 @@
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraint =
                     getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
             // prepare callback
-            TxnId txnId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getTxnId();
             IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
-                    storageComponentProvider, secondaryIndex, txnId, indexOp, 
modificationCallbackPrimaryKeyFields);
+                    storageComponentProvider, secondaryIndex, indexOp, 
modificationCallbackPrimaryKeyFields);
             IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
             IOperatorDescriptor op;
@@ -1179,9 +1177,8 @@
                 getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
 
         // prepare callback
-        TxnId planTxnId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getTxnId();
         IModificationOperationCallbackFactory modificationCallbackFactory = 
dataset.getModificationCallbackFactory(
-                storageComponentProvider, secondaryIndex, planTxnId, indexOp, 
modificationCallbackPrimaryKeyFields);
+                storageComponentProvider, secondaryIndex, indexOp, 
modificationCallbackPrimaryKeyFields);
         IIndexDataflowHelperFactory indexDataflowHelperFactory =
                 new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
         IOperatorDescriptor op;
@@ -1289,9 +1286,8 @@
                     getSplitProviderAndConstraints(dataset, 
secondaryIndex.getIndexName());
 
             // prepare callback
-            TxnId txnId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getTxnId();
             IModificationOperationCallbackFactory modificationCallbackFactory 
= dataset.getModificationCallbackFactory(
-                    storageComponentProvider, secondaryIndex, txnId, indexOp, 
modificationCallbackPrimaryKeyFields);
+                    storageComponentProvider, secondaryIndex, indexOp, 
modificationCallbackPrimaryKeyFields);
             IIndexDataflowHelperFactory indexDataFlowFactory = new 
IndexDataflowHelperFactory(
                     storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
             IOperatorDescriptor op;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index e6c0de8..2386d77 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -42,7 +42,6 @@
 import 
org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
 import org.apache.asterix.common.metadata.IDataset;
 import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.common.utils.JobUtils.ProgressState;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -541,8 +540,6 @@
      *
      * @param index
      *            the index
-     * @param txnId
-     *            the job id being compiled
      * @param op
      *            the operation this search is part of
      * @param primaryKeyFields
@@ -553,21 +550,21 @@
      *             if the callback factory could not be created
      */
     public ISearchOperationCallbackFactory 
getSearchCallbackFactory(IStorageComponentProvider storageComponentProvider,
-            Index index, TxnId txnId, IndexOperation op, int[] 
primaryKeyFields) throws AlgebricksException {
+            Index index, IndexOperation op, int[] primaryKeyFields) throws 
AlgebricksException {
         if (index.isPrimaryIndex()) {
             /*
              * Due to the read-committed isolation level,
              * we may acquire very short duration lock(i.e., instant lock) for 
readers.
              */
-            return (op == IndexOperation.UPSERT) ?
-                    new LockThenSearchOperationCallbackFactory(txnId, 
getDatasetId(), primaryKeyFields,
-                            
storageComponentProvider.getTransactionSubsystemProvider(), 
ResourceType.LSM_BTREE) :
-                    new 
PrimaryIndexInstantSearchOperationCallbackFactory(txnId, getDatasetId(), 
primaryKeyFields,
+            return (op == IndexOperation.UPSERT)
+                    ? new 
LockThenSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields,
+                            
storageComponentProvider.getTransactionSubsystemProvider(), 
ResourceType.LSM_BTREE)
+                    : new 
PrimaryIndexInstantSearchOperationCallbackFactory(getDatasetId(), 
primaryKeyFields,
                             
storageComponentProvider.getTransactionSubsystemProvider(), 
ResourceType.LSM_BTREE);
         } else if (index.getKeyFieldNames().isEmpty()) {
             // this is the case where the index is secondary primary index and 
locking is required
             // since the secondary primary index replaces the dataset index 
(which locks)
-            return new 
PrimaryIndexInstantSearchOperationCallbackFactory(txnId, getDatasetId(), 
primaryKeyFields,
+            return new 
PrimaryIndexInstantSearchOperationCallbackFactory(getDatasetId(), 
primaryKeyFields,
                     
storageComponentProvider.getTransactionSubsystemProvider(), 
ResourceType.LSM_BTREE);
         }
         return new SecondaryIndexSearchOperationCallbackFactory();
@@ -578,8 +575,6 @@
      *
      * @param index
      *            the index
-     * @param txnId
-     *            the job id of the job being compiled
      * @param op
      *            the operation performed for this callback
      * @param primaryKeyFields
@@ -590,24 +585,23 @@
      *             If the callback factory could not be created
      */
     public IModificationOperationCallbackFactory 
getModificationCallbackFactory(
-            IStorageComponentProvider componentProvider, Index index, TxnId 
txnId, IndexOperation op,
+            IStorageComponentProvider componentProvider, Index index, 
IndexOperation op,
             int[] primaryKeyFields) throws AlgebricksException {
         if (index.isPrimaryIndex()) {
-            return op == IndexOperation.UPSERT ?
-                    new UpsertOperationCallbackFactory(txnId, getDatasetId(), 
primaryKeyFields,
+            return op == IndexOperation.UPSERT ? new 
UpsertOperationCallbackFactory(getDatasetId(), primaryKeyFields,
                             
componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
-                            index.resourceType()) :
-                    op == IndexOperation.DELETE || op == IndexOperation.INSERT 
?
-                            new 
PrimaryIndexModificationOperationCallbackFactory(txnId, getDatasetId(),
+                            index.resourceType())
+                    : op == IndexOperation.DELETE || op == 
IndexOperation.INSERT
+                            ? new 
PrimaryIndexModificationOperationCallbackFactory(getDatasetId(),
                                     primaryKeyFields, 
componentProvider.getTransactionSubsystemProvider(),
-                                    Operation.get(op), index.resourceType()) :
-                            NoOpOperationCallbackFactory.INSTANCE;
+                                    Operation.get(op), index.resourceType())
+                            : NoOpOperationCallbackFactory.INSTANCE;
         } else {
-            return op == IndexOperation.DELETE || op == IndexOperation.INSERT 
|| op == IndexOperation.UPSERT ?
-                    new 
SecondaryIndexModificationOperationCallbackFactory(txnId, getDatasetId(), 
primaryKeyFields,
+            return op == IndexOperation.DELETE || op == IndexOperation.INSERT 
|| op == IndexOperation.UPSERT
+                    ? new 
SecondaryIndexModificationOperationCallbackFactory(getDatasetId(), 
primaryKeyFields,
                             
componentProvider.getTransactionSubsystemProvider(), Operation.get(op),
-                            index.resourceType()) :
-                    NoOpOperationCallbackFactory.INSTANCE;
+                            index.resourceType())
+                    : NoOpOperationCallbackFactory.INSTANCE;
         }
     }
 
@@ -651,8 +645,6 @@
      *
      * @param metadataProvider,
      *            the metadata provider.
-     * @param txnId,
-     *            the AsterixDB job id for transaction management.
      * @param primaryKeyFieldPermutation,
      *            the primary key field permutation according to the input.
      * @param isSink,
@@ -660,10 +652,10 @@
      * @return the commit runtime factory for inserting/upserting/deleting 
operations on this dataset.
      * @throws AlgebricksException
      */
-    public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider 
metadataProvider, TxnId txnId,
+    public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider 
metadataProvider,
             int[] primaryKeyFieldPermutation, boolean isSink) throws 
AlgebricksException {
         int[] datasetPartitions = getDatasetPartitions(metadataProvider);
-        return new CommitRuntimeFactory(txnId, datasetId, 
primaryKeyFieldPermutation,
+        return new CommitRuntimeFactory(datasetId, primaryKeyFieldPermutation,
                 metadataProvider.isWriteTransaction(), datasetPartitions, 
isSink);
     }
 
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 5973c06..3d05c0e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -38,7 +38,6 @@
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
@@ -57,7 +56,6 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
 import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor;
 import org.apache.asterix.runtime.utils.RuntimeUtils;
 import 
org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
@@ -335,13 +333,11 @@
      *            the metadata provider.
      * @param dataset,
      *            the dataset to scan.
-     * @param txnId,
-     *            the AsterixDB job id for transaction management.
      * @return a primary index scan operator.
      * @throws AlgebricksException
      */
     public static IOperatorDescriptor 
createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider 
metadataProvider,
-            Dataset dataset, TxnId txnId) throws AlgebricksException {
+            Dataset dataset) throws AlgebricksException {
         Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
primarySplitsAndConstraint =
                 metadataProvider.getSplitProviderAndConstraints(dataset);
         IFileSplitProvider primaryFileSplitProvider = 
primarySplitsAndConstraint.first;
@@ -351,8 +347,8 @@
         // +Infinity
         int[] highKeyFields = null;
         ITransactionSubsystemProvider txnSubsystemProvider = 
TransactionSubsystemProvider.INSTANCE;
-        ISearchOperationCallbackFactory searchCallbackFactory =
-                new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, 
dataset.getDatasetId(),
+        ISearchOperationCallbackFactory searchCallbackFactory = new 
PrimaryIndexInstantSearchOperationCallbackFactory(
+                dataset.getDatasetId(),
                         dataset.getPrimaryBloomFilterFields(), 
txnSubsystemProvider,
                         IRecoveryManager.ResourceType.LSM_BTREE);
         IndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
@@ -396,7 +392,6 @@
                 metadataProvider.getSplitProviderAndConstraints(dataset);
 
         // prepare callback
-        TxnId txnId = ((JobEventListenerFactory) 
spec.getJobletEventListenerFactory()).getTxnId(null);
         int[] primaryKeyFields = new int[numKeys];
         for (int i = 0; i < numKeys; i++) {
             primaryKeyFields[i] = i;
@@ -405,9 +400,9 @@
                 metadataProvider.getDatasetIndexes(dataset.getDataverseName(), 
dataset.getDatasetName()).size() > 1;
         IStorageComponentProvider storageComponentProvider = 
metadataProvider.getStorageComponentProvider();
         IModificationOperationCallbackFactory modificationCallbackFactory = 
dataset.getModificationCallbackFactory(
-                storageComponentProvider, primaryIndex, txnId, 
IndexOperation.UPSERT, primaryKeyFields);
+                storageComponentProvider, primaryIndex, IndexOperation.UPSERT, 
primaryKeyFields);
         ISearchOperationCallbackFactory searchCallbackFactory = 
dataset.getSearchCallbackFactory(
-                storageComponentProvider, primaryIndex, txnId, 
IndexOperation.UPSERT, primaryKeyFields);
+                storageComponentProvider, primaryIndex, IndexOperation.UPSERT, 
primaryKeyFields);
         IIndexDataflowHelperFactory idfh =
                 new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), 
splitsAndConstraint.first);
         LSMPrimaryUpsertOperatorDescriptor op;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index e6a24e3..2ebfe78 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.metadata.utils;
 
-import static 
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.*;
+import static 
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 
 import java.util.EnumSet;
 import java.util.List;
@@ -161,13 +161,12 @@
      *            the metadata provider.
      * @return the AsterixDB job id for transaction management.
      */
-    public static TxnId bindJobEventListener(JobSpecification spec, 
MetadataProvider metadataProvider) {
+    public static void bindJobEventListener(JobSpecification spec, 
MetadataProvider metadataProvider) {
         TxnId txnId = TxnIdFactory.create();
         metadataProvider.setTxnId(txnId);
         boolean isWriteTransaction = metadataProvider.isWriteTransaction();
         IJobletEventListenerFactory jobEventListenerFactory = new 
JobEventListenerFactory(txnId, isWriteTransaction);
         spec.setJobletEventListenerFactory(jobEventListenerFactory);
-        return txnId;
     }
 
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
index 8f70f21..41def96 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java
@@ -22,7 +22,6 @@
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -129,11 +128,10 @@
             // Create dummy key provider for feeding the primary index scan.
             IOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset,
                     metadataProvider);
-            TxnId txnId = IndexUtil.bindJobEventListener(spec, 
metadataProvider);
+            IndexUtil.bindJobEventListener(spec, metadataProvider);
 
             // Create primary index scan op.
-            IOperatorDescriptor primaryScanOp = 
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
-                    txnId);
+            IOperatorDescriptor primaryScanOp = 
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset);
 
             // Assign op.
             IOperatorDescriptor sourceOp = primaryScanOp;
@@ -199,7 +197,6 @@
      *      ====== ========= ........ ........
      */
     @Override
-    @SuppressWarnings("rawtypes")
     protected void setSecondaryRecDescAndComparators() throws 
AlgebricksException {
         int numSecondaryKeys = index.getKeyFieldNames().size();
         secondaryFieldAccessEvalFactories = new 
IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields];
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
index 89bd4b1..7791cad 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java
@@ -21,7 +21,6 @@
 import java.util.List;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.entities.Index;
@@ -72,14 +71,14 @@
 
         // only handle internal datasets
         // Create dummy key provider for feeding the primary index scan.
-        TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+        IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create dummy key provider for feeding the primary index scan.
         IOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp = 
createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
-                
getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)),
 txnId);
+                
getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)));
 
         // Assign op.
         IOperatorDescriptor sourceOp = primaryScanOp;
@@ -124,7 +123,6 @@
     }
 
     @Override
-    @SuppressWarnings("rawtypes")
     protected void setSecondaryRecDescAndComparators() throws 
AlgebricksException {
         int numSecondaryKeys = index.getKeyFieldNames().size();
         secondaryFieldAccessEvalFactories = new 
IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields];
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
index 93cc11d..b91d65f 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java
@@ -21,7 +21,6 @@
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -79,7 +78,6 @@
     }
 
     @Override
-    @SuppressWarnings("rawtypes")
     protected void setSecondaryRecDescAndComparators() throws 
AlgebricksException {
         int numSecondaryKeys = index.getKeyFieldNames().size();
         IndexType indexType = index.getIndexType();
@@ -206,14 +204,14 @@
     @Override
     public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
         JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+        IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create dummy key provider for feeding the primary index scan.
         IOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp = 
createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
-                
getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)),
 txnId);
+                
getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)));
 
         IOperatorDescriptor sourceOp = primaryScanOp;
         boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
index 1333493..bf5178c 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java
@@ -23,7 +23,6 @@
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
@@ -52,7 +51,6 @@
 import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
 import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
 
-@SuppressWarnings("rawtypes")
 public class SecondaryCorrelatedRTreeOperationsHelper extends 
SecondaryCorrelatedTreeIndexOperationsHelper {
 
     protected IPrimitiveValueProviderFactory[] valueProviderFactories;
@@ -184,11 +182,11 @@
 
         // Create dummy key provider for feeding the primary index scan.
         IOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
-        TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+        IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp = 
createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider,
-                
getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)),
 txnId);
+                
getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)));
 
         // Assign op.
         IOperatorDescriptor sourceOp = primaryScanOp;
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
index 2a4a952..0a772fa 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java
@@ -24,7 +24,6 @@
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.metadata.declared.MetadataProvider;
@@ -102,7 +101,6 @@
     }
 
     protected RecordDescriptor getTaggedRecordDescriptor(RecordDescriptor 
recDescriptor) {
-        @SuppressWarnings("rawtypes")
         ISerializerDeserializer[] fields =
                 new ISerializerDeserializer[recDescriptor.getFields().length + 
NUM_TAG_FIELDS];
         ITypeTraits[] traits = null;
@@ -273,10 +271,10 @@
     }
 
     protected IOperatorDescriptor 
createPrimaryIndexScanDiskComponentsOp(JobSpecification spec,
-            MetadataProvider metadataProvider, RecordDescriptor outRecDesc, 
TxnId txnId) throws AlgebricksException {
+            MetadataProvider metadataProvider, RecordDescriptor outRecDesc) 
throws AlgebricksException {
         ITransactionSubsystemProvider txnSubsystemProvider = 
TransactionSubsystemProvider.INSTANCE;
-        ISearchOperationCallbackFactory searchCallbackFactory =
-                new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, 
dataset.getDatasetId(),
+        ISearchOperationCallbackFactory searchCallbackFactory = new 
PrimaryIndexInstantSearchOperationCallbackFactory(
+                dataset.getDatasetId(),
                         dataset.getPrimaryBloomFilterFields(), 
txnSubsystemProvider,
                         IRecoveryManager.ResourceType.LSM_BTREE);
         IndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
index 3626f16..1c9eb74 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java
@@ -22,7 +22,6 @@
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -82,7 +81,6 @@
     }
 
     @Override
-    @SuppressWarnings("rawtypes")
     protected void setSecondaryRecDescAndComparators() throws 
AlgebricksException {
         int numSecondaryKeys = index.getKeyFieldNames().size();
         IndexType indexType = index.getIndexType();
@@ -208,14 +206,14 @@
     @Override
     public JobSpecification buildLoadingJobSpec() throws AlgebricksException {
         JobSpecification spec = 
RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
-        TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider);
+        IndexUtil.bindJobEventListener(spec, metadataProvider);
 
         // Create dummy key provider for feeding the primary index scan.
         IOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
 
         // Create primary index scan op.
         IOperatorDescriptor primaryScanOp =
-                DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, 
dataset, txnId);
+                DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, 
dataset);
 
         IOperatorDescriptor sourceOp = primaryScanOp;
         boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes();
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
index 613df21..8e6e0e9 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java
@@ -23,7 +23,6 @@
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.GlobalConfig;
 import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.IndexingConstants;
 import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
@@ -58,7 +57,6 @@
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
 
-@SuppressWarnings("rawtypes")
 public class SecondaryRTreeOperationsHelper extends 
SecondaryTreeIndexOperationsHelper {
 
     protected IPrimitiveValueProviderFactory[] valueProviderFactories;
@@ -201,11 +199,10 @@
         if (dataset.getDatasetType() == DatasetType.INTERNAL) {
             // Create dummy key provider for feeding the primary index scan.
             IOperatorDescriptor keyProviderOp = 
DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider);
-            TxnId txnId = IndexUtil.bindJobEventListener(spec, 
metadataProvider);
+            IndexUtil.bindJobEventListener(spec, metadataProvider);
 
             // Create primary index scan op.
-            IOperatorDescriptor primaryScanOp = 
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset,
-                    txnId);
+            IOperatorDescriptor primaryScanOp = 
DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset);
 
             // Assign op.
             IOperatorDescriptor sourceOp = primaryScanOp;
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index d3c3fe7..0de61ff 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -48,12 +48,8 @@
         this.transactionalWrite = transactionalWrite;
     }
 
-    public TxnId getTxnId() {
-        return txnId;
-    }
-
     @Override
-    public TxnId getTxnId(TxnId compiledTxnId) {
+    public TxnId getTxnId(int datasetId) {
         return txnId;
     }
 
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
index bfe1925..656ea09 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.runtime.job.listener;
 
-import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.common.api.IJobEventListenerFactory;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -40,23 +40,22 @@
 public class MultiTransactionJobletEventListenerFactory implements 
IJobEventListenerFactory {
 
     private static final long serialVersionUID = 1L;
-    private final List<TxnId> txnIds;
+    private final Map<Integer, TxnId> txnIdMap;
     private final boolean transactionalWrite;
 
-    public MultiTransactionJobletEventListenerFactory(List<TxnId> txnIds, 
boolean transactionalWrite) {
-        this.txnIds = txnIds;
+    public MultiTransactionJobletEventListenerFactory(Map<Integer, TxnId> 
txnIdMap, boolean transactionalWrite) {
+        this.txnIdMap = txnIdMap;
         this.transactionalWrite = transactionalWrite;
     }
 
-    //TODO: Enable this factory to be usable for Deployed Jobs
     @Override
-    public TxnId getTxnId(TxnId compiledTxnId) {
-        return compiledTxnId;
+    public TxnId getTxnId(int datasetId) {
+        return txnIdMap.get(datasetId);
     }
 
     @Override
     public IJobletEventListenerFactory copyFactory() {
-        return new MultiTransactionJobletEventListenerFactory(txnIds, 
transactionalWrite);
+        return new MultiTransactionJobletEventListenerFactory(txnIdMap, 
transactionalWrite);
     }
 
     @Override
@@ -74,13 +73,13 @@
                     ITransactionManager txnManager =
                             ((INcApplicationContext) 
jobletContext.getServiceContext().getApplicationContext())
                                     
.getTransactionSubsystem().getTransactionManager();
-                    for (TxnId txnId : txnIds) {
-                        ITransactionContext txnContext = 
txnManager.getTransactionContext(txnId);
+                    for (TxnId subTxnId : txnIdMap.values()) {
+                        ITransactionContext txnContext = 
txnManager.getTransactionContext(subTxnId);
                         txnContext.setWriteTxn(transactionalWrite);
                         if (jobStatus != JobStatus.FAILURE) {
-                            txnManager.commitTransaction(txnId);
+                            txnManager.commitTransaction(subTxnId);
                         } else {
-                            txnManager.abortTransaction(txnId);
+                            txnManager.abortTransaction(subTxnId);
                         }
                     }
                 } catch (ACIDException e) {
@@ -93,9 +92,10 @@
                 try {
                     TransactionOptions options =
                             new 
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL);
-                    for (TxnId txnId : txnIds) {
+                    for (TxnId subTxnId : txnIdMap.values()) {
                         ((INcApplicationContext) 
jobletContext.getServiceContext().getApplicationContext())
-                                
.getTransactionSubsystem().getTransactionManager().beginTransaction(txnId, 
options);
+                                
.getTransactionSubsystem().getTransactionManager()
+                                .beginTransaction(subTxnId, options);
                     }
                 } catch (ACIDException e) {
                     throw new Error(e);
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
index 9f96263..1346b76 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
@@ -25,7 +25,6 @@
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -37,9 +36,9 @@
 
     private static final long serialVersionUID = 1L;
 
-    public LockThenSearchOperationCallbackFactory(TxnId txnId, int datasetId, 
int[] entityIdFields,
+    public LockThenSearchOperationCallbackFactory(int datasetId, int[] 
entityIdFields,
             ITransactionSubsystemProvider txnSubsystemProvider, byte 
resourceType) {
-        super(txnId, datasetId, entityIdFields, txnSubsystemProvider, 
resourceType);
+        super(datasetId, entityIdFields, txnSubsystemProvider, resourceType);
     }
 
     @Override
@@ -49,7 +48,7 @@
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(datasetId));
             return new LockThenSearchOperationCallback(new 
DatasetId(datasetId), resourceId, primaryKeyFields,
                     txnSubsystem, txnCtx, operatorNodePushable);
         } catch (ACIDException e) {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index f9c8e3c..d4242bf 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -26,7 +26,6 @@
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -39,9 +38,9 @@
 
     private static final long serialVersionUID = 1L;
 
-    public PrimaryIndexInstantSearchOperationCallbackFactory(TxnId txnId, int 
datasetId, int[] entityIdFields,
+    public PrimaryIndexInstantSearchOperationCallbackFactory(int datasetId, 
int[] entityIdFields,
             ITransactionSubsystemProvider txnSubsystemProvider, byte 
resourceType) {
-        super(txnId, datasetId, entityIdFields, txnSubsystemProvider, 
resourceType);
+        super(datasetId, entityIdFields, txnSubsystemProvider, resourceType);
     }
 
     @Override
@@ -51,7 +50,7 @@
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(datasetId));
             return new PrimaryIndexInstantSearchOperationCallback(new 
DatasetId(datasetId), resourceId,
                     primaryKeyFields, txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 8f5e386..97fd7ce 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -27,7 +27,6 @@
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -49,9 +48,9 @@
     private static final long serialVersionUID = 1L;
     private final Operation indexOp;
 
-    public PrimaryIndexModificationOperationCallbackFactory(TxnId txnId, int 
datasetId, int[] primaryKeyFields,
+    public PrimaryIndexModificationOperationCallbackFactory(int datasetId, 
int[] primaryKeyFields,
             ITransactionSubsystemProvider txnSubsystemProvider, Operation 
indexOp, byte resourceType) {
-        super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, 
resourceType);
+        super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
     }
 
@@ -69,7 +68,7 @@
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(datasetId));
             DatasetLocalResource aResource = (DatasetLocalResource) 
resource.getResource();
             IModificationOperationCallback modCallback = new 
PrimaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, 
txnSubsystem.getLockManager(), txnSubsystem,
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index 64cbbc9..72e48bf 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -26,7 +26,6 @@
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -39,9 +38,9 @@
 
     private static final long serialVersionUID = 1L;
 
-    public PrimaryIndexSearchOperationCallbackFactory(TxnId txnId, int 
datasetId, int[] entityIdFields,
+    public PrimaryIndexSearchOperationCallbackFactory(int datasetId, int[] 
entityIdFields,
             ITransactionSubsystemProvider txnSubsystemProvider, byte 
resourceType) {
-        super(txnId, datasetId, entityIdFields, txnSubsystemProvider, 
resourceType);
+        super(datasetId, entityIdFields, txnSubsystemProvider, resourceType);
     }
 
     @Override
@@ -51,7 +50,7 @@
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(datasetId));
             return new PrimaryIndexSearchOperationCallback(new 
DatasetId(datasetId), resourceId, primaryKeyFields,
                     txnSubsystem.getLockManager(), txnCtx);
         } catch (ACIDException e) {
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 3fc42c9..0c20ee9 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -27,7 +27,6 @@
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -45,9 +44,9 @@
     private static final long serialVersionUID = 1L;
     private final Operation indexOp;
 
-    public SecondaryIndexModificationOperationCallbackFactory(TxnId txnId, int 
datasetId, int[] primaryKeyFields,
+    public SecondaryIndexModificationOperationCallbackFactory(int datasetId, 
int[] primaryKeyFields,
             ITransactionSubsystemProvider txnSubsystemProvider, Operation 
indexOp, byte resourceType) {
-        super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, 
resourceType);
+        super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
     }
 
@@ -65,7 +64,7 @@
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(datasetId));
             DatasetLocalResource aResource = (DatasetLocalResource) 
resource.getResource();
             IModificationOperationCallback modCallback = new 
SecondaryIndexModificationOperationCallback(
                     new DatasetId(datasetId), primaryKeyFields, txnCtx, 
txnSubsystem.getLockManager(), txnSubsystem,
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index da4aab8..c2f512f 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -26,7 +26,6 @@
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.transactions.TxnId;
 import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
@@ -44,9 +43,9 @@
     private static final long serialVersionUID = 1L;
     protected final Operation indexOp;
 
-    public UpsertOperationCallbackFactory(TxnId txnId, int datasetId, int[] 
primaryKeyFields,
+    public UpsertOperationCallbackFactory(int datasetId, int[] 
primaryKeyFields,
             ITransactionSubsystemProvider txnSubsystemProvider, Operation 
indexOp, byte resourceType) {
-        super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, 
resourceType);
+        super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
         this.indexOp = indexOp;
     }
 
@@ -65,7 +64,7 @@
         try {
             IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
             ITransactionContext txnCtx = txnSubsystem.getTransactionManager()
-                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(txnId));
+                    .getTransactionContext(((IJobEventListenerFactory) 
fact).getTxnId(datasetId));
             IModificationOperationCallback modCallback = new 
UpsertOperationCallback(new DatasetId(datasetId),
                     primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), 
txnSubsystem, resource.getId(),
                     aResource.getPartition(), resourceType, indexOp);
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 91db197..445ad4a 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -20,7 +20,6 @@
 package org.apache.asterix.transaction.management.runtime;
 
 import org.apache.asterix.common.api.IJobEventListenerFactory;
-import org.apache.asterix.common.transactions.TxnId;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
 import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -31,16 +30,14 @@
 
     private static final long serialVersionUID = 1L;
 
-    protected final TxnId txnId;
     protected final int datasetId;
     protected final int[] primaryKeyFields;
     protected final boolean isWriteTransaction;
     protected int[] datasetPartitions;
     protected final boolean isSink;
 
-    public CommitRuntimeFactory(TxnId txnId, int datasetId, int[] 
primaryKeyFields, boolean isWriteTransaction,
+    public CommitRuntimeFactory(int datasetId, int[] primaryKeyFields, boolean 
isWriteTransaction,
             int[] datasetPartitions, boolean isSink) {
-        this.txnId = txnId;
         this.datasetId = datasetId;
         this.primaryKeyFields = primaryKeyFields;
         this.isWriteTransaction = isWriteTransaction;
@@ -56,7 +53,8 @@
     @Override
     public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws 
HyracksDataException {
         IJobletEventListenerFactory fact = 
ctx.getJobletContext().getJobletEventListenerFactory();
-        return new CommitRuntime(ctx, ((IJobEventListenerFactory) 
fact).getTxnId(txnId), datasetId, primaryKeyFields,
-                isWriteTransaction, 
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
+        return new CommitRuntime(ctx, ((IJobEventListenerFactory) 
fact).getTxnId(datasetId), datasetId,
+                primaryKeyFields, isWriteTransaction,
+                
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
     }
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2154
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I9526d50b31aebc3bf971d95ba3edf29c0c1066a7
Gerrit-PatchSet: 10
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Steven Jacobs <[email protected]>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-Reviewer: Steven Jacobs <[email protected]>
Gerrit-Reviewer: Xikui Wang <[email protected]>
Gerrit-Reviewer: abdullah alamoudi <[email protected]>

Reply via email to