Steven Jacobs has submitted this change and it was merged. Change subject: [ASTERIXDB-2167][TX][RT] Remove TxnId from JobSpecification ......................................................................
[ASTERIXDB-2167][TX][RT] Remove TxnId from JobSpecification - user model changes: no - storage format changes: no - interface changes: IJobEventListenerFactory details: - Remove the TxnId from the compiled job specification - This enables one job spec to be used by multiple jobs/transactions - Runtime operators who need the TxnId will pull it from the EventListener Change-Id: I9526d50b31aebc3bf971d95ba3edf29c0c1066a7 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2154 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> --- M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java 28 files changed, 106 insertions(+), 160 deletions(-) Approvals: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; No violations found; ; Verified Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java index abd18aa..09092ff 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java @@ -21,7 +21,6 @@ import java.util.List; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -43,12 +42,10 @@ public class CommitPOperator extends AbstractPhysicalOperator { private final List<LogicalVariable> primaryKeyLogicalVars; - private final TxnId txnId; private final Dataset dataset; private final boolean isSink; - public CommitPOperator(TxnId txnId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) { - this.txnId = txnId; + public CommitPOperator(Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, boolean isSink) { this.dataset = dataset; this.primaryKeyLogicalVars = primaryKeyLogicalVars; this.isSink = isSink; @@ -87,7 +84,7 @@ int[] primaryKeyFields = JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]); //get dataset splits - IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(metadataProvider, txnId, primaryKeyFields, + IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(metadataProvider, primaryKeyFields, isSink); builder.contributeMicroOperator(op, runtime, recDesc); ILogicalOperator src = op.getInputs().get(0).getValue(); diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java index c941320..c3cc0ae 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java @@ -30,7 +30,6 @@ import org.apache.asterix.optimizer.rules.am.InvertedIndexAccessMethod; import org.apache.asterix.optimizer.rules.am.InvertedIndexAccessMethod.SearchModifierType; import org.apache.asterix.optimizer.rules.am.InvertedIndexJobGenParams; -import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -158,7 +157,6 @@ jobSpec, outputRecDesc, queryField, dataflowHelperFactory, queryTokenizerFactory, searchModifierFactory, retainInput, retainMissing, context.getMissingWriterFactory(), dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(), secondaryIndex, - ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getTxnId(), IndexOperation.SEARCH, null), minFilterFieldIndexes, maxFilterFieldIndexes, isFullTextSearchQuery, numPrimaryKeys, propagateIndexFilter); diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java index 61339bf..7dfe161 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java @@ -23,9 +23,7 @@ import org.apache.asterix.algebra.operators.CommitOperator; import org.apache.asterix.algebra.operators.physical.CommitPOperator; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.metadata.declared.DatasetDataSource; -import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.commons.lang3.mutable.Mutable; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -99,14 +97,10 @@ primaryKeyLogicalVars.add(new LogicalVariable(varRefExpr.getVariableReference().getId())); } - //get TxnId(TransactorId) - MetadataProvider mp = (MetadataProvider) context.getMetadataProvider(); - TxnId txnId = mp.getTxnId(); - //create the logical and physical operator CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, isSink); CommitPOperator commitPOperator = - new CommitPOperator(txnId, dataset, primaryKeyLogicalVars, isSink); + new CommitPOperator(dataset, primaryKeyLogicalVars, isSink); commitOperator.setPhysicalOperator(commitPOperator); //create ExtensionOperator and put the commitOperator in it. diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java index e42b5e5..9dddda4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -279,7 +279,7 @@ Map<ConnectorDescriptorId, ConnectorDescriptorId> connectorIdMapping = new HashMap<>(); Map<OperatorDescriptorId, List<LocationConstraint>> operatorLocations = new HashMap<>(); Map<OperatorDescriptorId, Integer> operatorCounts = new HashMap<>(); - List<TxnId> txnIds = new ArrayList<>(); + Map<Integer, TxnId> txnIdMap = new HashMap<>(); FeedMetaOperatorDescriptor metaOp; for (int iter1 = 0; iter1 < jobsList.size(); iter1++) { @@ -415,11 +415,16 @@ for (OperatorDescriptorId root : subJob.getRoots()) { jobSpec.addRoot(jobSpec.getOperatorMap().get(operatorIdMapping.get(root))); } - txnIds.add(((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getTxnId()); + int datasetId = metadataProvider + .findDataset(curFeedConnection.getDataverseName(), curFeedConnection.getDatasetName()) + .getDatasetId(); + TxnId txnId = ((JobEventListenerFactory) subJob.getJobletEventListenerFactory()).getTxnId(datasetId); + txnIdMap.put(datasetId, txnId); } // jobEventListenerFactory - jobSpec.setJobletEventListenerFactory(new MultiTransactionJobletEventListenerFactory(txnIds, true)); + jobSpec.setJobletEventListenerFactory( + new MultiTransactionJobletEventListenerFactory(txnIdMap, true)); // useConnectorSchedulingPolicy jobSpec.setUseConnectorPolicyForScheduling(jobsList.get(0).isUseConnectorPolicyForScheduling()); // connectorAssignmentPolicy diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java index 42f577f..a04c994 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/RebalanceUtil.java @@ -283,13 +283,13 @@ IOperatorDescriptor starter = DatasetUtil.createDummyKeyProviderOp(spec, source, metadataProvider); // Creates primary index scan op. - IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source, txnId); + IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, source); // Creates secondary BTree upsert op. IOperatorDescriptor upsertOp = createPrimaryIndexUpsertOp(spec, metadataProvider, source, target); // The final commit operator. - IOperatorDescriptor commitOp = createUpsertCommitOp(spec, metadataProvider, txnId, target); + IOperatorDescriptor commitOp = createUpsertCommitOp(spec, metadataProvider, target); // Connects empty-tuple-source and scan. spec.connect(new OneToOneConnectorDescriptor(spec), starter, 0, primaryScanOp, 0); @@ -326,11 +326,11 @@ // Creates the commit operator for populating the target dataset. private static IOperatorDescriptor createUpsertCommitOp(JobSpecification spec, MetadataProvider metadataProvider, - TxnId txnId, Dataset target) throws AlgebricksException { + Dataset target) throws AlgebricksException { int[] primaryKeyFields = getPrimaryKeyPermutationForUpsert(target); return new AlgebricksMetaOperatorDescriptor(spec, 1, 0, new IPushRuntimeFactory[] { - target.getCommitRuntimeFactory(metadataProvider, txnId, primaryKeyFields, true) }, + target.getCommitRuntimeFactory(metadataProvider, primaryKeyFields, true) }, new RecordDescriptor[] { target.getPrimaryRecordDescriptor(metadataProvider) }); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index 352a5f8..a1c2ee6 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -183,7 +183,7 @@ mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators); IndexOperation op = IndexOperation.INSERT; IModificationOperationCallbackFactory modOpCallbackFactory = - new PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(ctx), dataset.getDatasetId(), + new PrimaryIndexModificationOperationCallbackFactory(dataset.getDatasetId(), primaryIndexInfo.primaryKeyIndexes, TXN_SUBSYSTEM_PROVIDER, Operation.get(op), ResourceType.LSM_BTREE); IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider(); @@ -614,9 +614,9 @@ PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, keyTypes, recordType, metaType, mergePolicy.first, mergePolicy.second, filterFields, keyIndexes, keyIndicators); IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( - storageComponentProvider, primaryIndexInfo.index, getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes); + storageComponentProvider, primaryIndexInfo.index, IndexOperation.UPSERT, keyIndexes); ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( - storageComponentProvider, primaryIndexInfo.index, getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes); + storageComponentProvider, primaryIndexInfo.index, IndexOperation.UPSERT, keyIndexes); IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider(); IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java index 0f37b13..acb3ae8 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IJobEventListenerFactory.java @@ -22,9 +22,9 @@ import org.apache.hyracks.api.job.IJobletEventListenerFactory; /** - * an interface for JobEventListenerFactories to add Asterix transaction JobId getter + * an interface for JobEventListenerFactories to add Asterix txnId getter */ public interface IJobEventListenerFactory extends IJobletEventListenerFactory { - TxnId getTxnId(TxnId compiledTxnId); + TxnId getTxnId(int datasetId); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java index d2b1276..ce6671b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java @@ -26,15 +26,13 @@ public abstract class AbstractOperationCallbackFactory implements Serializable { private static final long serialVersionUID = 1L; - protected final TxnId txnId; protected final int datasetId; protected final int[] primaryKeyFields; protected final ITransactionSubsystemProvider txnSubsystemProvider; protected final byte resourceType; - public AbstractOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, + public AbstractOperationCallbackFactory(int datasetId, int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) { - this.txnId = txnId; this.datasetId = datasetId; this.primaryKeyFields = primaryKeyFields; this.txnSubsystemProvider = txnSubsystemProvider; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 305cdfa..1e0d597 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -84,7 +84,6 @@ import org.apache.asterix.om.utils.NonTaggedFormatUtil; import org.apache.asterix.runtime.base.AsterixTupleFilterFactory; import org.apache.asterix.runtime.formats.FormatUtils; -import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor; import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage; import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor; @@ -446,7 +445,7 @@ } ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( - storageComponentProvider, theIndex, txnId, IndexOperation.SEARCH, primaryKeyFields); + storageComponentProvider, theIndex, IndexOperation.SEARCH, primaryKeyFields); IStorageManager storageManager = getStorageComponentProvider().getStorageManager(); IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(storageManager, spPc.first); BTreeSearchOperatorDescriptor btreeSearchOp; @@ -485,7 +484,7 @@ } ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( - storageComponentProvider, secondaryIndex, txnId, IndexOperation.SEARCH, primaryKeyFields); + storageComponentProvider, secondaryIndex, IndexOperation.SEARCH, primaryKeyFields); RTreeSearchOperatorDescriptor rtreeSearchOp; IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), spPc.first); @@ -789,7 +788,7 @@ // files index RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); ISearchOperationCallbackFactory searchOpCallbackFactory = dataset - .getSearchCallbackFactory(storageComponentProvider, fileIndex, txnId, IndexOperation.SEARCH, null); + .getSearchCallbackFactory(storageComponentProvider, fileIndex, IndexOperation.SEARCH, null); // Create the operator ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory, outRecDesc, indexDataflowHelperFactory, searchOpCallbackFactory, @@ -959,7 +958,7 @@ primaryKeyFields[i] = i; } IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( - storageComponentProvider, primaryIndex, txnId, indexOp, primaryKeyFields); + storageComponentProvider, primaryIndex, indexOp, primaryKeyFields); IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); IOperatorDescriptor op; @@ -1081,9 +1080,8 @@ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); // prepare callback - TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(); IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( - storageComponentProvider, secondaryIndex, txnId, indexOp, modificationCallbackPrimaryKeyFields); + storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields); IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory( storageComponentProvider.getStorageManager(), splitsAndConstraint.first); IOperatorDescriptor op; @@ -1179,9 +1177,8 @@ getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); // prepare callback - TxnId planTxnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(); IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( - storageComponentProvider, secondaryIndex, planTxnId, indexOp, modificationCallbackPrimaryKeyFields); + storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields); IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); IOperatorDescriptor op; @@ -1289,9 +1286,8 @@ getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); // prepare callback - TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(); IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( - storageComponentProvider, secondaryIndex, txnId, indexOp, modificationCallbackPrimaryKeyFields); + storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields); IIndexDataflowHelperFactory indexDataFlowFactory = new IndexDataflowHelperFactory( storageComponentProvider.getStorageManager(), splitsAndConstraint.first); IOperatorDescriptor op; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index e6c0de8..2386d77 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -42,7 +42,6 @@ import org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory; import org.apache.asterix.common.metadata.IDataset; import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.common.utils.JobUtils; import org.apache.asterix.common.utils.JobUtils.ProgressState; import org.apache.asterix.external.feed.management.FeedConnectionId; @@ -541,8 +540,6 @@ * * @param index * the index - * @param txnId - * the job id being compiled * @param op * the operation this search is part of * @param primaryKeyFields @@ -553,21 +550,21 @@ * if the callback factory could not be created */ public ISearchOperationCallbackFactory getSearchCallbackFactory(IStorageComponentProvider storageComponentProvider, - Index index, TxnId txnId, IndexOperation op, int[] primaryKeyFields) throws AlgebricksException { + Index index, IndexOperation op, int[] primaryKeyFields) throws AlgebricksException { if (index.isPrimaryIndex()) { /* * Due to the read-committed isolation level, * we may acquire very short duration lock(i.e., instant lock) for readers. */ - return (op == IndexOperation.UPSERT) ? - new LockThenSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields, - storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE) : - new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields, + return (op == IndexOperation.UPSERT) + ? new LockThenSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields, + storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE) + : new PrimaryIndexInstantSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields, storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE); } else if (index.getKeyFieldNames().isEmpty()) { // this is the case where the index is secondary primary index and locking is required // since the secondary primary index replaces the dataset index (which locks) - return new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields, + return new PrimaryIndexInstantSearchOperationCallbackFactory(getDatasetId(), primaryKeyFields, storageComponentProvider.getTransactionSubsystemProvider(), ResourceType.LSM_BTREE); } return new SecondaryIndexSearchOperationCallbackFactory(); @@ -578,8 +575,6 @@ * * @param index * the index - * @param txnId - * the job id of the job being compiled * @param op * the operation performed for this callback * @param primaryKeyFields @@ -590,24 +585,23 @@ * If the callback factory could not be created */ public IModificationOperationCallbackFactory getModificationCallbackFactory( - IStorageComponentProvider componentProvider, Index index, TxnId txnId, IndexOperation op, + IStorageComponentProvider componentProvider, Index index, IndexOperation op, int[] primaryKeyFields) throws AlgebricksException { if (index.isPrimaryIndex()) { - return op == IndexOperation.UPSERT ? - new UpsertOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields, + return op == IndexOperation.UPSERT ? new UpsertOperationCallbackFactory(getDatasetId(), primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), Operation.get(op), - index.resourceType()) : - op == IndexOperation.DELETE || op == IndexOperation.INSERT ? - new PrimaryIndexModificationOperationCallbackFactory(txnId, getDatasetId(), + index.resourceType()) + : op == IndexOperation.DELETE || op == IndexOperation.INSERT + ? new PrimaryIndexModificationOperationCallbackFactory(getDatasetId(), primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), - Operation.get(op), index.resourceType()) : - NoOpOperationCallbackFactory.INSTANCE; + Operation.get(op), index.resourceType()) + : NoOpOperationCallbackFactory.INSTANCE; } else { - return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT ? - new SecondaryIndexModificationOperationCallbackFactory(txnId, getDatasetId(), primaryKeyFields, + return op == IndexOperation.DELETE || op == IndexOperation.INSERT || op == IndexOperation.UPSERT + ? new SecondaryIndexModificationOperationCallbackFactory(getDatasetId(), primaryKeyFields, componentProvider.getTransactionSubsystemProvider(), Operation.get(op), - index.resourceType()) : - NoOpOperationCallbackFactory.INSTANCE; + index.resourceType()) + : NoOpOperationCallbackFactory.INSTANCE; } } @@ -651,8 +645,6 @@ * * @param metadataProvider, * the metadata provider. - * @param txnId, - * the AsterixDB job id for transaction management. * @param primaryKeyFieldPermutation, * the primary key field permutation according to the input. * @param isSink, @@ -660,10 +652,10 @@ * @return the commit runtime factory for inserting/upserting/deleting operations on this dataset. * @throws AlgebricksException */ - public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider, TxnId txnId, + public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider, int[] primaryKeyFieldPermutation, boolean isSink) throws AlgebricksException { int[] datasetPartitions = getDatasetPartitions(metadataProvider); - return new CommitRuntimeFactory(txnId, datasetId, primaryKeyFieldPermutation, + return new CommitRuntimeFactory(datasetId, primaryKeyFieldPermutation, metadataProvider.isWriteTransaction(), datasetPartitions, isSink); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java index 5973c06..3d05c0e 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java @@ -38,7 +38,6 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.transactions.IRecoveryManager; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.formats.base.IDataFormat; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; @@ -57,7 +56,6 @@ import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; -import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor; import org.apache.asterix.runtime.utils.RuntimeUtils; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory; @@ -335,13 +333,11 @@ * the metadata provider. * @param dataset, * the dataset to scan. - * @param txnId, - * the AsterixDB job id for transaction management. * @return a primary index scan operator. * @throws AlgebricksException */ public static IOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider metadataProvider, - Dataset dataset, TxnId txnId) throws AlgebricksException { + Dataset dataset) throws AlgebricksException { Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint = metadataProvider.getSplitProviderAndConstraints(dataset); IFileSplitProvider primaryFileSplitProvider = primarySplitsAndConstraint.first; @@ -351,8 +347,8 @@ // +Infinity int[] highKeyFields = null; ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE; - ISearchOperationCallbackFactory searchCallbackFactory = - new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, dataset.getDatasetId(), + ISearchOperationCallbackFactory searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory( + dataset.getDatasetId(), dataset.getPrimaryBloomFilterFields(), txnSubsystemProvider, IRecoveryManager.ResourceType.LSM_BTREE); IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( @@ -396,7 +392,6 @@ metadataProvider.getSplitProviderAndConstraints(dataset); // prepare callback - TxnId txnId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getTxnId(null); int[] primaryKeyFields = new int[numKeys]; for (int i = 0; i < numKeys; i++) { primaryKeyFields[i] = i; @@ -405,9 +400,9 @@ metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()).size() > 1; IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider(); IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( - storageComponentProvider, primaryIndex, txnId, IndexOperation.UPSERT, primaryKeyFields); + storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields); ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( - storageComponentProvider, primaryIndex, txnId, IndexOperation.UPSERT, primaryKeyFields); + storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields); IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); LSMPrimaryUpsertOperatorDescriptor op; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java index e6a24e3..2ebfe78 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.metadata.utils; -import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.*; +import static org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption; import java.util.EnumSet; import java.util.List; @@ -161,13 +161,12 @@ * the metadata provider. * @return the AsterixDB job id for transaction management. */ - public static TxnId bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) { + public static void bindJobEventListener(JobSpecification spec, MetadataProvider metadataProvider) { TxnId txnId = TxnIdFactory.create(); metadataProvider.setTxnId(txnId); boolean isWriteTransaction = metadataProvider.isWriteTransaction(); IJobletEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, isWriteTransaction); spec.setJobletEventListenerFactory(jobEventListenerFactory); - return txnId; } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java index 8f70f21..41def96 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryBTreeOperationsHelper.java @@ -22,7 +22,6 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.GlobalConfig; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -129,11 +128,10 @@ // Create dummy key provider for feeding the primary index scan. IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); - TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider); + IndexUtil.bindJobEventListener(spec, metadataProvider); // Create primary index scan op. - IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset, - txnId); + IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset); // Assign op. IOperatorDescriptor sourceOp = primaryScanOp; @@ -199,7 +197,6 @@ * ====== ========= ........ ........ */ @Override - @SuppressWarnings("rawtypes") protected void setSecondaryRecDescAndComparators() throws AlgebricksException { int numSecondaryKeys = index.getKeyFieldNames().size(); secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields]; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java index 89bd4b1..7791cad 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedBTreeOperationsHelper.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.asterix.common.config.DatasetConfig.DatasetType; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.metadata.entities.Index; @@ -72,14 +71,14 @@ // only handle internal datasets // Create dummy key provider for feeding the primary index scan. - TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider); + IndexUtil.bindJobEventListener(spec, metadataProvider); // Create dummy key provider for feeding the primary index scan. IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); // Create primary index scan op. IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider, - getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId); + getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider))); // Assign op. IOperatorDescriptor sourceOp = primaryScanOp; @@ -124,7 +123,6 @@ } @Override - @SuppressWarnings("rawtypes") protected void setSecondaryRecDescAndComparators() throws AlgebricksException { int numSecondaryKeys = index.getKeyFieldNames().size(); secondaryFieldAccessEvalFactories = new IScalarEvaluatorFactory[numSecondaryKeys + numFilterFields]; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java index 93cc11d..b91d65f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedInvertedIndexOperationsHelper.java @@ -21,7 +21,6 @@ import org.apache.asterix.common.config.DatasetConfig.IndexType; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; @@ -79,7 +78,6 @@ } @Override - @SuppressWarnings("rawtypes") protected void setSecondaryRecDescAndComparators() throws AlgebricksException { int numSecondaryKeys = index.getKeyFieldNames().size(); IndexType indexType = index.getIndexType(); @@ -206,14 +204,14 @@ @Override public JobSpecification buildLoadingJobSpec() throws AlgebricksException { JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider); + IndexUtil.bindJobEventListener(spec, metadataProvider); // Create dummy key provider for feeding the primary index scan. IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); // Create primary index scan op. IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider, - getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId); + getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider))); IOperatorDescriptor sourceOp = primaryScanOp; boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes(); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java index 1333493..bf5178c 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedRTreeOperationsHelper.java @@ -23,7 +23,6 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.formats.nontagged.TypeTraitProvider; @@ -52,7 +51,6 @@ import org.apache.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor; import org.apache.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory; -@SuppressWarnings("rawtypes") public class SecondaryCorrelatedRTreeOperationsHelper extends SecondaryCorrelatedTreeIndexOperationsHelper { protected IPrimitiveValueProviderFactory[] valueProviderFactories; @@ -184,11 +182,11 @@ // Create dummy key provider for feeding the primary index scan. IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); - TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider); + IndexUtil.bindJobEventListener(spec, metadataProvider); // Create primary index scan op. IOperatorDescriptor primaryScanOp = createPrimaryIndexScanDiskComponentsOp(spec, metadataProvider, - getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider)), txnId); + getTaggedRecordDescriptor(dataset.getPrimaryRecordDescriptor(metadataProvider))); // Assign op. IOperatorDescriptor sourceOp = primaryScanOp; diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java index 2a4a952..0a772fa 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryCorrelatedTreeIndexOperationsHelper.java @@ -24,7 +24,6 @@ import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.transactions.IRecoveryManager; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -102,7 +101,6 @@ } protected RecordDescriptor getTaggedRecordDescriptor(RecordDescriptor recDescriptor) { - @SuppressWarnings("rawtypes") ISerializerDeserializer[] fields = new ISerializerDeserializer[recDescriptor.getFields().length + NUM_TAG_FIELDS]; ITypeTraits[] traits = null; @@ -273,10 +271,10 @@ } protected IOperatorDescriptor createPrimaryIndexScanDiskComponentsOp(JobSpecification spec, - MetadataProvider metadataProvider, RecordDescriptor outRecDesc, TxnId txnId) throws AlgebricksException { + MetadataProvider metadataProvider, RecordDescriptor outRecDesc) throws AlgebricksException { ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE; - ISearchOperationCallbackFactory searchCallbackFactory = - new PrimaryIndexInstantSearchOperationCallbackFactory(txnId, dataset.getDatasetId(), + ISearchOperationCallbackFactory searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory( + dataset.getDatasetId(), dataset.getPrimaryBloomFilterFields(), txnSubsystemProvider, IRecoveryManager.ResourceType.LSM_BTREE); IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java index 3626f16..1c9eb74 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryInvertedIndexOperationsHelper.java @@ -22,7 +22,6 @@ import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.metadata.entities.Dataset; @@ -82,7 +81,6 @@ } @Override - @SuppressWarnings("rawtypes") protected void setSecondaryRecDescAndComparators() throws AlgebricksException { int numSecondaryKeys = index.getKeyFieldNames().size(); IndexType indexType = index.getIndexType(); @@ -208,14 +206,14 @@ @Override public JobSpecification buildLoadingJobSpec() throws AlgebricksException { JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext()); - TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider); + IndexUtil.bindJobEventListener(spec, metadataProvider); // Create dummy key provider for feeding the primary index scan. IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); // Create primary index scan op. IOperatorDescriptor primaryScanOp = - DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset, txnId); + DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset); IOperatorDescriptor sourceOp = primaryScanOp; boolean isOverridingKeyFieldTypes = index.isOverridingKeyFieldTypes(); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java index 613df21..8e6e0e9 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryRTreeOperationsHelper.java @@ -23,7 +23,6 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.external.indexing.IndexingConstants; import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor; import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; @@ -58,7 +57,6 @@ import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor; -@SuppressWarnings("rawtypes") public class SecondaryRTreeOperationsHelper extends SecondaryTreeIndexOperationsHelper { protected IPrimitiveValueProviderFactory[] valueProviderFactories; @@ -201,11 +199,10 @@ if (dataset.getDatasetType() == DatasetType.INTERNAL) { // Create dummy key provider for feeding the primary index scan. IOperatorDescriptor keyProviderOp = DatasetUtil.createDummyKeyProviderOp(spec, dataset, metadataProvider); - TxnId txnId = IndexUtil.bindJobEventListener(spec, metadataProvider); + IndexUtil.bindJobEventListener(spec, metadataProvider); // Create primary index scan op. - IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset, - txnId); + IOperatorDescriptor primaryScanOp = DatasetUtil.createPrimaryIndexScanOp(spec, metadataProvider, dataset); // Assign op. IOperatorDescriptor sourceOp = primaryScanOp; diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java index d3c3fe7..0de61ff 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java @@ -48,12 +48,8 @@ this.transactionalWrite = transactionalWrite; } - public TxnId getTxnId() { - return txnId; - } - @Override - public TxnId getTxnId(TxnId compiledTxnId) { + public TxnId getTxnId(int datasetId) { return txnId; } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java index bfe1925..656ea09 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/MultiTransactionJobletEventListenerFactory.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.runtime.job.listener; -import java.util.List; +import java.util.Map; import org.apache.asterix.common.api.IJobEventListenerFactory; import org.apache.asterix.common.api.INcApplicationContext; @@ -40,23 +40,22 @@ public class MultiTransactionJobletEventListenerFactory implements IJobEventListenerFactory { private static final long serialVersionUID = 1L; - private final List<TxnId> txnIds; + private final Map<Integer, TxnId> txnIdMap; private final boolean transactionalWrite; - public MultiTransactionJobletEventListenerFactory(List<TxnId> txnIds, boolean transactionalWrite) { - this.txnIds = txnIds; + public MultiTransactionJobletEventListenerFactory(Map<Integer, TxnId> txnIdMap, boolean transactionalWrite) { + this.txnIdMap = txnIdMap; this.transactionalWrite = transactionalWrite; } - //TODO: Enable this factory to be usable for Deployed Jobs @Override - public TxnId getTxnId(TxnId compiledTxnId) { - return compiledTxnId; + public TxnId getTxnId(int datasetId) { + return txnIdMap.get(datasetId); } @Override public IJobletEventListenerFactory copyFactory() { - return new MultiTransactionJobletEventListenerFactory(txnIds, transactionalWrite); + return new MultiTransactionJobletEventListenerFactory(txnIdMap, transactionalWrite); } @Override @@ -74,13 +73,13 @@ ITransactionManager txnManager = ((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext()) .getTransactionSubsystem().getTransactionManager(); - for (TxnId txnId : txnIds) { - ITransactionContext txnContext = txnManager.getTransactionContext(txnId); + for (TxnId subTxnId : txnIdMap.values()) { + ITransactionContext txnContext = txnManager.getTransactionContext(subTxnId); txnContext.setWriteTxn(transactionalWrite); if (jobStatus != JobStatus.FAILURE) { - txnManager.commitTransaction(txnId); + txnManager.commitTransaction(subTxnId); } else { - txnManager.abortTransaction(txnId); + txnManager.abortTransaction(subTxnId); } } } catch (ACIDException e) { @@ -93,9 +92,10 @@ try { TransactionOptions options = new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL); - for (TxnId txnId : txnIds) { + for (TxnId subTxnId : txnIdMap.values()) { ((INcApplicationContext) jobletContext.getServiceContext().getApplicationContext()) - .getTransactionSubsystem().getTransactionManager().beginTransaction(txnId, options); + .getTransactionSubsystem().getTransactionManager() + .beginTransaction(subTxnId, options); } } catch (ACIDException e) { throw new Error(e); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java index 9f96263..1346b76 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java @@ -25,7 +25,6 @@ import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -37,9 +36,9 @@ private static final long serialVersionUID = 1L; - public LockThenSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields, + public LockThenSearchOperationCallbackFactory(int datasetId, int[] entityIdFields, ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) { - super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType); + super(datasetId, entityIdFields, txnSubsystemProvider, resourceType); } @Override @@ -49,7 +48,7 @@ try { IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); ITransactionContext txnCtx = txnSubsystem.getTransactionManager() - .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId)); + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId)); return new LockThenSearchOperationCallback(new DatasetId(datasetId), resourceId, primaryKeyFields, txnSubsystem, txnCtx, operatorNodePushable); } catch (ACIDException e) { diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java index f9c8e3c..d4242bf 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java @@ -26,7 +26,6 @@ import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -39,9 +38,9 @@ private static final long serialVersionUID = 1L; - public PrimaryIndexInstantSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields, + public PrimaryIndexInstantSearchOperationCallbackFactory(int datasetId, int[] entityIdFields, ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) { - super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType); + super(datasetId, entityIdFields, txnSubsystemProvider, resourceType); } @Override @@ -51,7 +50,7 @@ try { IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); ITransactionContext txnCtx = txnSubsystem.getTransactionManager() - .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId)); + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId)); return new PrimaryIndexInstantSearchOperationCallback(new DatasetId(datasetId), resourceId, primaryKeyFields, txnSubsystem.getLockManager(), txnCtx); } catch (ACIDException e) { diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java index 8f5e386..97fd7ce 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java @@ -27,7 +27,6 @@ import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; @@ -49,9 +48,9 @@ private static final long serialVersionUID = 1L; private final Operation indexOp; - public PrimaryIndexModificationOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, + public PrimaryIndexModificationOperationCallbackFactory(int datasetId, int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) { - super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); + super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); this.indexOp = indexOp; } @@ -69,7 +68,7 @@ try { IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); ITransactionContext txnCtx = txnSubsystem.getTransactionManager() - .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId)); + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId)); DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource(); IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java index 64cbbc9..72e48bf 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java @@ -26,7 +26,6 @@ import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -39,9 +38,9 @@ private static final long serialVersionUID = 1L; - public PrimaryIndexSearchOperationCallbackFactory(TxnId txnId, int datasetId, int[] entityIdFields, + public PrimaryIndexSearchOperationCallbackFactory(int datasetId, int[] entityIdFields, ITransactionSubsystemProvider txnSubsystemProvider, byte resourceType) { - super(txnId, datasetId, entityIdFields, txnSubsystemProvider, resourceType); + super(datasetId, entityIdFields, txnSubsystemProvider, resourceType); } @Override @@ -51,7 +50,7 @@ try { IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); ITransactionContext txnCtx = txnSubsystem.getTransactionManager() - .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId)); + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId)); return new PrimaryIndexSearchOperationCallback(new DatasetId(datasetId), resourceId, primaryKeyFields, txnSubsystem.getLockManager(), txnCtx); } catch (ACIDException e) { diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java index 3fc42c9..0c20ee9 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java @@ -27,7 +27,6 @@ import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; @@ -45,9 +44,9 @@ private static final long serialVersionUID = 1L; private final Operation indexOp; - public SecondaryIndexModificationOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, + public SecondaryIndexModificationOperationCallbackFactory(int datasetId, int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) { - super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); + super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); this.indexOp = indexOp; } @@ -65,7 +64,7 @@ try { IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); ITransactionContext txnCtx = txnSubsystem.getTransactionManager() - .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId)); + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId)); DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource(); IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback( new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java index da4aab8..c2f512f 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java @@ -26,7 +26,6 @@ import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.transactions.TxnId; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.IOperatorNodePushable; @@ -44,9 +43,9 @@ private static final long serialVersionUID = 1L; protected final Operation indexOp; - public UpsertOperationCallbackFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, + public UpsertOperationCallbackFactory(int datasetId, int[] primaryKeyFields, ITransactionSubsystemProvider txnSubsystemProvider, Operation indexOp, byte resourceType) { - super(txnId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); + super(datasetId, primaryKeyFields, txnSubsystemProvider, resourceType); this.indexOp = indexOp; } @@ -65,7 +64,7 @@ try { IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); ITransactionContext txnCtx = txnSubsystem.getTransactionManager() - .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(txnId)); + .getTransactionContext(((IJobEventListenerFactory) fact).getTxnId(datasetId)); IModificationOperationCallback modCallback = new UpsertOperationCallback(new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(), aResource.getPartition(), resourceType, indexOp); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java index 91db197..445ad4a 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java @@ -20,7 +20,6 @@ package org.apache.asterix.transaction.management.runtime; import org.apache.asterix.common.api.IJobEventListenerFactory; -import org.apache.asterix.common.transactions.TxnId; import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -31,16 +30,14 @@ private static final long serialVersionUID = 1L; - protected final TxnId txnId; protected final int datasetId; protected final int[] primaryKeyFields; protected final boolean isWriteTransaction; protected int[] datasetPartitions; protected final boolean isSink; - public CommitRuntimeFactory(TxnId txnId, int datasetId, int[] primaryKeyFields, boolean isWriteTransaction, + public CommitRuntimeFactory(int datasetId, int[] primaryKeyFields, boolean isWriteTransaction, int[] datasetPartitions, boolean isSink) { - this.txnId = txnId; this.datasetId = datasetId; this.primaryKeyFields = primaryKeyFields; this.isWriteTransaction = isWriteTransaction; @@ -56,7 +53,8 @@ @Override public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory(); - return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(txnId), datasetId, primaryKeyFields, - isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink); + return new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(datasetId), datasetId, + primaryKeyFields, isWriteTransaction, + datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2154 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I9526d50b31aebc3bf971d95ba3edf29c0c1066a7 Gerrit-PatchSet: 10 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Steven Jacobs <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Steven Jacobs <[email protected]> Gerrit-Reviewer: Xikui Wang <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
