abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1554
Change subject: Fix transaction logs and optimize upserts
......................................................................
Fix transaction logs and optimize upserts
Change-Id: Ice5296267033cd7debe76894c864c6411f761d83
---
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
M asterixdb/asterix-transactions/pom.xml
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
D
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
M
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
M
hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
36 files changed, 118 insertions(+), 219 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/54/1554/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
index d0cee55..20c69c4 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java
@@ -46,15 +46,12 @@
private final List<LogicalVariable> primaryKeyLogicalVars;
private final JobId jobId;
private final Dataset dataset;
- private final LogicalVariable upsertVar;
private final boolean isSink;
- public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable>
primaryKeyLogicalVars,
- LogicalVariable upsertVar, boolean isSink) {
+ public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable>
primaryKeyLogicalVars, boolean isSink) {
this.jobId = jobId;
this.dataset = dataset;
this.primaryKeyLogicalVars = primaryKeyLogicalVars;
- this.upsertVar = upsertVar;
this.isSink = isSink;
}
@@ -98,12 +95,8 @@
for (int i = 0; i < splitsForDataset.length; i++) {
datasetPartitions[i] = i;
}
- int upsertVarIdx = -1;
- if (upsertVar != null) {
- upsertVarIdx = inputSchemas[0].findVariable(upsertVar);
- }
IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(jobId,
primaryKeyFields, metadataProvider,
- upsertVarIdx, datasetPartitions, isSink);
+ datasetPartitions, isSink);
builder.contributeMicroOperator(op, runtime, recDesc);
ILogicalOperator src = op.getInputs().get(0).getValue();
builder.contributeGraphEdge(src, 0, op, 0);
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
index 9b442ae..47a37d1 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java
@@ -140,7 +140,7 @@
//create the logical and physical operator
CommitOperator commitOperator = new
CommitOperator(primaryKeyLogicalVars, upsertVar, isSink);
CommitPOperator commitPOperator =
- new CommitPOperator(jobId, dataset, primaryKeyLogicalVars,
upsertVar, isSink);
+ new CommitPOperator(jobId, dataset, primaryKeyLogicalVars,
isSink);
commitOperator.setPhysicalOperator(commitPOperator);
//create ExtensionOperator and put the commitOperator in it.
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4ee1122..b2e8640 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -98,8 +98,9 @@
this.appCtx = appCtx;
this.txnSubsystem = txnSubsystem;
logMgr = (LogManager) txnSubsystem.getLogManager();
- ReplicationProperties repProperties = ((IPropertiesProvider)
txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getAppContext()).getReplicationProperties();
+ ReplicationProperties repProperties =
+ ((IPropertiesProvider)
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext())
+ .getReplicationProperties();
replicationEnabled = repProperties.isParticipant(txnSubsystem.getId());
localResourceRepository = (PersistentLocalResourceRepository)
txnSubsystem.getAsterixAppRuntimeContextProvider()
.getLocalResourceRepository();
@@ -240,7 +241,6 @@
jobCommitLogCount++;
break;
case LogType.ENTITY_COMMIT:
- case LogType.UPSERT_ENTITY_COMMIT:
if (partitions.contains(logRecord.getResourcePartition()))
{
analyzeEntityCommitLog(logRecord);
entityCommitLogCount++;
@@ -406,7 +406,6 @@
case LogType.ENTITY_COMMIT:
case LogType.ABORT:
case LogType.FLUSH:
- case LogType.UPSERT_ENTITY_COMMIT:
case LogType.WAIT:
case LogType.MARKER:
//do nothing
@@ -599,13 +598,12 @@
}
break;
case LogType.ENTITY_COMMIT:
- case LogType.UPSERT_ENTITY_COMMIT:
if
(activePartitions.contains(logRecord.getResourcePartition())) {
jobLoserEntity2LSNsMap.remove(tempKeyTxnId);
entityCommitLogCount++;
if (IS_DEBUG_MODE) {
- LOGGER.info(Thread.currentThread().getId() +
"======> entity_commit[" + currentLSN
- + "]" + tempKeyTxnId);
+ LOGGER.info(Thread.currentThread().getId() +
"======> entity_commit[" + currentLSN + "]"
+ + tempKeyTxnId);
}
}
break;
@@ -687,10 +685,13 @@
(ILSMIndex)
datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
logRecord.getResourceId());
ILSMIndexAccessor indexAccessor =
index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
+ if (logRecord.getNewOp() == IndexOperation.INSERT_BYTE) {
indexAccessor.forceDelete(logRecord.getNewValue());
- } else if (logRecord.getNewOp() ==
IndexOperation.DELETE.ordinal()) {
+ } else if (logRecord.getNewOp() == IndexOperation.DELETE_BYTE) {
indexAccessor.forceInsert(logRecord.getNewValue());
+ } else if (logRecord.getNewOp() == IndexOperation.UPSERT_BYTE) {
+ // undo, upsert the old value
+ indexAccessor.forceUpsert(logRecord.getOldValue());
} else {
throw new IllegalStateException("Unsupported OperationType: "
+ logRecord.getNewOp());
}
@@ -706,10 +707,13 @@
ILSMIndex index = (ILSMIndex)
datasetLifecycleManager.getIndex(datasetId, resourceId);
ILSMIndexAccessor indexAccessor =
index.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
- if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
+ if (logRecord.getNewOp() == IndexOperation.INSERT_BYTE) {
indexAccessor.forceInsert(logRecord.getNewValue());
- } else if (logRecord.getNewOp() ==
IndexOperation.DELETE.ordinal()) {
+ } else if (logRecord.getNewOp() == IndexOperation.DELETE_BYTE) {
indexAccessor.forceDelete(logRecord.getNewValue());
+ } else if (logRecord.getNewOp() == IndexOperation.UPSERT_BYTE) {
+ // redo, upsert the new value
+ indexAccessor.forceUpsert(logRecord.getNewValue());
} else {
throw new IllegalStateException("Unsupported OperationType: "
+ logRecord.getNewOp());
}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index cc12f36..47e212e 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -161,18 +161,18 @@
return new org.apache.asterix.common.transactions.JobId((int)
jobId.getId());
}
- public LSMInsertDeleteOperatorNodePushable
getInsertPipeline(IHyracksTaskContext ctx, Dataset dataset,
- IAType[] primaryKeyTypes, ARecordType recordType, ARecordType
metaType,
- ILSMMergePolicyFactory mergePolicyFactory, Map<String, String>
mergePolicyProperties, int[] filterFields,
- int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
- StorageComponentProvider storageComponentProvider) throws
AlgebricksException, HyracksDataException {
+ public IFrameWriter[] getInsertPipeline(IHyracksTaskContext ctx, Dataset
dataset, IAType[] primaryKeyTypes,
+ ARecordType recordType, ARecordType metaType,
ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties, int[] filterFields,
int[] primaryKeyIndexes,
+ List<Integer> primaryKeyIndicators, StorageComponentProvider
storageComponentProvider)
+ throws AlgebricksException, HyracksDataException {
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset,
primaryKeyTypes, recordType, metaType,
mergePolicyFactory, mergePolicyProperties, filterFields,
primaryKeyIndexes, primaryKeyIndicators,
storageComponentProvider);
IndexOperation op = IndexOperation.INSERT;
IModificationOperationCallbackFactory modOpCallbackFactory =
new
PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(),
dataset.getDatasetId(),
- primaryIndexInfo.primaryKeyIndexes,
TXN_SUBSYSTEM_PROVIDER, op, ResourceType.LSM_BTREE, true);
+ primaryIndexInfo.primaryKeyIndexes,
TXN_SUBSYSTEM_PROVIDER, op, ResourceType.LSM_BTREE);
LSMTreeInsertDeleteOperatorDescriptor indexOpDesc =
getInsertOpratorDesc(primaryIndexInfo, modOpCallbackFactory);
IIndexDataflowHelperFactory dataflowHelperFactory =
@@ -185,7 +185,8 @@
primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION,
true);
insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
- return insertOp;
+ IFrameWriter[] pipeline = { insertOp, commitOp };
+ return pipeline;
}
public IPushRuntime getFullScanPipeline(IFrameWriter countOp,
IHyracksTaskContext ctx, Dataset dataset,
@@ -300,8 +301,7 @@
Index index = primaryIndexInfo.getIndex();
MetadataProvider mdProvider = new MetadataProvider(dataverse,
storageComponentProvider);
return dataset.getIndexDataflowHelperFactory(mdProvider, index,
primaryIndexInfo.recordType,
- primaryIndexInfo.metaType, primaryIndexInfo.mergePolicyFactory,
- primaryIndexInfo.mergePolicyProperties);
+ primaryIndexInfo.metaType,
primaryIndexInfo.mergePolicyFactory, primaryIndexInfo.mergePolicyProperties);
}
public IIndexDataflowHelper getPrimaryIndexDataflowHelper(Dataset dataset,
IAType[] primaryKeyTypes,
@@ -434,11 +434,10 @@
private Index index;
private IStorageComponentProvider storageComponentProvider;
- public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes,
ARecordType recordType,
- ARecordType metaType, ILSMMergePolicyFactory
mergePolicyFactory,
- Map<String, String> mergePolicyProperties, int[] filterFields,
int[] primaryKeyIndexes,
- List<Integer> primaryKeyIndicators, IStorageComponentProvider
storageComponentProvider)
- throws AlgebricksException {
+ public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes,
ARecordType recordType, ARecordType metaType,
+ ILSMMergePolicyFactory mergePolicyFactory, Map<String, String>
mergePolicyProperties,
+ int[] filterFields, int[] primaryKeyIndexes, List<Integer>
primaryKeyIndicators,
+ IStorageComponentProvider storageComponentProvider) throws
AlgebricksException {
this.storageComponentProvider = storageComponentProvider;
this.dataset = dataset;
this.primaryKeyTypes = primaryKeyTypes;
@@ -477,10 +476,10 @@
index = new Index(dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName(),
IndexType.BTREE, keyFieldNames, keyFieldSourceIndicators,
keyFieldTypes, false, true,
MetadataUtil.PENDING_NO_OP);
- localResourceFactoryProvider =
getPrimaryIndexLocalResourceMetadataProvider(storageComponentProvider,
- index, dataset, primaryIndexTypeTraits,
primaryIndexComparatorFactories,
- primaryIndexBloomFilterKeyFields, mergePolicyFactory,
mergePolicyProperties, filterTypeTraits,
- filterCmpFactories, btreeFields, filterFields,
dataset.getIndexOperationTrackerFactory(index));
+ localResourceFactoryProvider =
getPrimaryIndexLocalResourceMetadataProvider(storageComponentProvider, index,
+ dataset, primaryIndexTypeTraits,
primaryIndexComparatorFactories, primaryIndexBloomFilterKeyFields,
+ mergePolicyFactory, mergePolicyProperties,
filterTypeTraits, filterCmpFactories, btreeFields,
+ filterFields,
dataset.getIndexOperationTrackerFactory(index));
}
public Index getIndex() {
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index 1467dbf..435caf7 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -115,9 +115,10 @@
IHyracksTaskContext ctx = nc.createTestContext(true);
nc.newJobId();
ITransactionContext txnCtx =
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
- LSMInsertDeleteOperatorNodePushable insertOp =
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+ LSMInsertDeleteOperatorNodePushable insertOp =
+ (LSMInsertDeleteOperatorNodePushable)
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
null, null, KEY_INDEXES,
- KEY_INDICATORS_LIST, storageManager);
+ KEY_INDICATORS_LIST, storageManager)[0];
insertOp.open();
TupleGenerator tupleGenerator = new
TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS,
META_GEN_FUNCTION, UNIQUE_META_FIELDS);
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 10e8658..0dec460 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -123,9 +123,10 @@
nc.newJobId();
ITransactionContext txnCtx =
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
// Prepare insert operation
- LSMInsertDeleteOperatorNodePushable insertOp =
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+ LSMInsertDeleteOperatorNodePushable insertOp =
+ (LSMInsertDeleteOperatorNodePushable)
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
null, null, KEY_INDEXES, KEY_INDICATOR_LIST,
- storageManager);
+ storageManager)[0];
insertOp.open();
TupleGenerator tupleGenerator = new
TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS,
META_GEN_FUNCTION, UNIQUE_META_FIELDS);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index d691b18..4822c01 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -133,7 +133,6 @@
buffer.putInt(jobId);
switch (logType) {
case LogType.ENTITY_COMMIT:
- case LogType.UPSERT_ENTITY_COMMIT:
writeEntityInfo(buffer);
break;
case LogType.UPDATE:
@@ -268,7 +267,6 @@
computeAndSetLogSize();
break;
case LogType.ENTITY_COMMIT:
- case LogType.UPSERT_ENTITY_COMMIT:
if (readEntityInfo(buffer)) {
computeAndSetLogSize();
} else {
@@ -428,7 +426,6 @@
logSize = JOB_TERMINATE_LOG_SIZE;
break;
case LogType.ENTITY_COMMIT:
- case LogType.UPSERT_ENTITY_COMMIT:
logSize = ENTITY_COMMIT_LOG_BASE_SIZE + PKValueSize;
break;
case LogType.FLUSH:
@@ -457,7 +454,7 @@
builder.append(" LogType : ").append(LogType.toString(logType));
builder.append(" LogSize : ").append(logSize);
builder.append(" JobId : ").append(jobId);
- if (logType == LogType.ENTITY_COMMIT || logType ==
LogType.UPSERT_ENTITY_COMMIT || logType == LogType.UPDATE) {
+ if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) {
builder.append(" DatasetId : ").append(datasetId);
builder.append(" ResourcePartition : ").append(resourcePartition);
builder.append(" PKHashValue : ").append(PKHashValue);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
index 269e4b9..11c45ad 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
@@ -25,7 +25,6 @@
public static final byte ENTITY_COMMIT = 2;
public static final byte ABORT = 3;
public static final byte FLUSH = 4;
- public static final byte UPSERT_ENTITY_COMMIT = 5;
public static final byte WAIT = 6;
public static final byte MARKER = 7;
@@ -34,7 +33,6 @@
private static final String STRING_ENTITY_COMMIT = "ENTITY_COMMIT";
private static final String STRING_ABORT = "ABORT";
private static final String STRING_FLUSH = "FLUSH";
- private static final String STRING_UPSERT_ENTITY_COMMIT =
"UPSERT_ENTITY_COMMIT";
private static final String STRING_WAIT = "WAIT";
private static final String STRING_MARKER = "MARKER";
private static final String STRING_UNKNOWN_LOG_TYPE = "UNKNOWN_LOG_TYPE";
@@ -51,8 +49,6 @@
return STRING_ABORT;
case LogType.FLUSH:
return STRING_FLUSH;
- case LogType.UPSERT_ENTITY_COMMIT:
- return STRING_UPSERT_ENTITY_COMMIT;
case LogType.WAIT:
return STRING_WAIT;
case LogType.MARKER:
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 51790e6..01b4db2 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -444,7 +444,7 @@
// locks and secondary index doesn't.
return new
SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
metadataIndex.getPrimaryKeyIndexes(), txnCtx,
transactionSubsystem.getLockManager(),
- transactionSubsystem, resourceId, metadataStoragePartition,
ResourceType.LSM_BTREE, indexOp, false);
+ transactionSubsystem, resourceId, metadataStoragePartition,
ResourceType.LSM_BTREE, indexOp);
}
@Override
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index f5c6d9a..9645ccf 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1118,7 +1118,7 @@
? new
TempDatasetPrimaryIndexModificationOperationCallbackFactory(jobId, datasetId,
primaryKeyFields, txnSubsystemProvider,
IndexOperation.UPSERT, ResourceType.LSM_BTREE)
: new UpsertOperationCallbackFactory(jobId, datasetId,
primaryKeyFields, txnSubsystemProvider,
- IndexOperation.UPSERT, ResourceType.LSM_BTREE,
dataset.hasMetaPart());
+ IndexOperation.UPSERT, ResourceType.LSM_BTREE);
LockThenSearchOperationCallbackFactory searchCallbackFactory = new
LockThenSearchOperationCallbackFactory(
jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
ResourceType.LSM_BTREE);
@@ -1319,8 +1319,7 @@
primaryKeyFields, txnSubsystemProvider, indexOp,
ResourceType.LSM_BTREE)
: new PrimaryIndexModificationOperationCallbackFactory(
((JobEventListenerFactory)
spec.getJobletEventListenerFactory()).getJobId(), datasetId,
- primaryKeyFields, txnSubsystemProvider, indexOp,
ResourceType.LSM_BTREE,
- dataset.hasMetaPart());
+ primaryKeyFields, txnSubsystemProvider, indexOp,
ResourceType.LSM_BTREE);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1517,7 +1516,7 @@
ResourceType.LSM_BTREE)
: new
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, indexOp,
- ResourceType.LSM_BTREE, dataset.hasMetaPart());
+ ResourceType.LSM_BTREE);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1675,7 +1674,7 @@
ResourceType.LSM_RTREE)
: new
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, indexOp,
- ResourceType.LSM_RTREE, dataset.hasMetaPart());
+ ResourceType.LSM_RTREE);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
@@ -1885,7 +1884,7 @@
ResourceType.LSM_INVERTED_INDEX)
: new
SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
modificationCallbackPrimaryKeyFields,
txnSubsystemProvider, indexOp,
- ResourceType.LSM_INVERTED_INDEX,
dataset.hasMetaPart());
+ ResourceType.LSM_INVERTED_INDEX);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
IIndexDataflowHelperFactory indexDataFlowFactory =
dataset.getIndexDataflowHelperFactory(this,
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index 2e328f9..8ab9d10 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -29,7 +29,6 @@
import org.apache.asterix.active.ActiveJobNotificationHandler;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
@@ -37,6 +36,7 @@
import
org.apache.asterix.common.ioopcallbacks.LSMBTreeWithBuddyIOOperationCallbackFactory;
import
org.apache.asterix.common.ioopcallbacks.LSMInvertedIndexIOOperationCallbackFactory;
import
org.apache.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.JobId;
import org.apache.asterix.common.utils.JobUtils;
@@ -520,18 +520,16 @@
} else if (index.isPrimaryIndex()) {
return op == IndexOperation.UPSERT
? new UpsertOperationCallbackFactory(jobId,
getDatasetId(), primaryKeyFields,
-
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(),
- hasMetaPart())
+
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType())
: op == IndexOperation.DELETE || op ==
IndexOperation.INSERT
? new
PrimaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
primaryKeyFields,
componentProvider.getTransactionSubsystemProvider(), op,
- index.resourceType(), hasMetaPart())
+ index.resourceType())
: NoOpOperationCallbackFactory.INSTANCE;
} else {
return op == IndexOperation.DELETE || op == IndexOperation.INSERT
|| op == IndexOperation.UPSERT
? new
SecondaryIndexModificationOperationCallbackFactory(jobId, getDatasetId(),
primaryKeyFields,
-
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType(),
- hasMetaPart())
+
componentProvider.getTransactionSubsystemProvider(), op, index.resourceType())
: NoOpOperationCallbackFactory.INSTANCE;
}
}
@@ -572,9 +570,9 @@
}
public IPushRuntimeFactory getCommitRuntimeFactory(JobId jobId, int[]
primaryKeyFields,
- MetadataProvider metadataProvider, int upsertVarIdx, int[]
datasetPartitions, boolean isSink) {
+ MetadataProvider metadataProvider, int[] datasetPartitions,
boolean isSink) {
return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields,
- metadataProvider.isTemporaryDatasetWriteJob(),
metadataProvider.isWriteTransaction(), upsertVarIdx,
+ metadataProvider.isTemporaryDatasetWriteJob(),
metadataProvider.isWriteTransaction(),
datasetPartitions, isSink);
}
}
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index 044707a..85e2bef 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -462,7 +462,6 @@
switch (remoteLog.getLogType()) {
case LogType.UPDATE:
case LogType.ENTITY_COMMIT:
- case LogType.UPSERT_ENTITY_COMMIT:
//if the log partition belongs to a partitions hosted
on this node, replicate it
if
(nodeHostedPartitions.contains(remoteLog.getResourcePartition())) {
logManager.log(remoteLog);
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 6869523..d07bab3 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -49,7 +49,6 @@
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
import org.apache.hyracks.storage.am.common.api.IIndexCursor;
-import
org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.api.IndexException;
import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
@@ -84,8 +83,8 @@
private final int metaFieldIndex;
private LockThenSearchOperationCallback searchCallback;
- public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor
opDesc, IHyracksTaskContext ctx,
- int partition, int[] fieldPermutation, IRecordDescriptorProvider
recordDescProvider, int numOfPrimaryKeys,
+ public LSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor
opDesc, IHyracksTaskContext ctx, int partition,
+ int[] fieldPermutation, IRecordDescriptorProvider
recordDescProvider, int numOfPrimaryKeys,
ARecordType recordType, int filterFieldIndex) throws
HyracksDataException {
super(opDesc, ctx, partition, fieldPermutation, recordDescProvider,
IndexOperation.UPSERT);
this.key = new PermutingFrameTupleReference();
@@ -140,15 +139,15 @@
tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
dos = tb.getDataOutput();
appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
- modCallback =
opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
- indexHelper.getResource(), ctx, this);
+ modCallback = opDesc.getModificationOpCallbackFactory()
+
.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
searchCallback = (LockThenSearchOperationCallback)
opDesc.getSearchOpCallbackFactory()
.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
indexAccessor = index.createAccessor(modCallback, searchCallback);
cursor = indexAccessor.createSearchCursor(false);
frameTuple = new FrameTupleReference();
- IAppRuntimeContext runtimeCtx = (IAppRuntimeContext)
ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
+ IAppRuntimeContext runtimeCtx =
+ (IAppRuntimeContext)
ctx.getJobletContext().getApplicationContext().getApplicationObject();
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
runtimeCtx.getTransactionSubsystem().getLogManager());
} catch (Exception e) {
@@ -221,17 +220,19 @@
}
// if with filters, append the filter
if (isFiltered) {
- dos.write(prevTuple.getFieldData(filterFieldIndex),
- prevTuple.getFieldStart(filterFieldIndex),
+ dos.write(prevTuple.getFieldData(filterFieldIndex),
prevTuple.getFieldStart(filterFieldIndex),
prevTuple.getFieldLength(filterFieldIndex));
tb.addFieldEndOffset();
}
- modCallback.setOp(Operation.DELETE);
- if (firstModification) {
- lsmAccessor.delete(prevTuple);
- firstModification = false;
- } else {
- lsmAccessor.forceDelete(prevTuple);
+ if (isNull(tuple, numOfPrimaryKeys)) {
+ // Only delete if it is a delete and not upsert
+ modCallback.setOp(IndexOperation.DELETE_BYTE);
+ if (firstModification) {
+ lsmAccessor.delete(prevTuple);
+ firstModification = false;
+ } else {
+ lsmAccessor.forceDelete(prevTuple);
+ }
}
} else {
prevTuple = null;
@@ -246,12 +247,12 @@
cursor.reset();
}
if (!isNull(tuple, numOfPrimaryKeys)) {
- modCallback.setOp(Operation.INSERT);
+ modCallback.setOp(IndexOperation.UPSERT_BYTE);
if (firstModification) {
- lsmAccessor.insert(tuple);
+ lsmAccessor.upsert(tuple);
firstModification = false;
} else {
- lsmAccessor.forceInsert(tuple);
+ lsmAccessor.forceUpsert(tuple);
}
recordWasInserted = true;
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index eab9cc7..4dd1eca 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -24,7 +24,6 @@
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import
org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import
org.apache.hyracks.storage.am.common.tuples.PermutingFrameTupleReference;
@@ -109,12 +108,12 @@
}
if (!isPrevValueNull) {
// previous is not null, we need to delete previous
- modCallback.setOp(Operation.DELETE);
+ modCallback.setOp(IndexOperation.DELETE_BYTE);
lsmAccessor.forceDelete(prevValueTuple);
}
if (!isNewNull) {
// new is not null, we need to insert the new value
- modCallback.setOp(Operation.INSERT);
+ modCallback.setOp(IndexOperation.INSERT_BYTE);
lsmAccessor.forceInsert(tuple);
}
diff --git a/asterixdb/asterix-transactions/pom.xml
b/asterixdb/asterix-transactions/pom.xml
index a65a436..2ff0ccd 100644
--- a/asterixdb/asterix-transactions/pom.xml
+++ b/asterixdb/asterix-transactions/pom.xml
@@ -106,11 +106,6 @@
<scope>compile</scope>
</dependency>
<dependency>
- <groupId>org.apache.asterix</groupId>
- <artifactId>asterix-om</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
index 2a3467e..3893854 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java
@@ -28,14 +28,10 @@
import org.apache.asterix.common.transactions.LogType;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import
org.apache.hyracks.storage.am.common.api.IModificationOperationCallback.Operation;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
public abstract class AbstractIndexModificationOperationCallback extends
AbstractOperationCallback {
-
- private static final byte INSERT_OP = (byte)
IndexOperation.INSERT.ordinal();
- private static final byte DELETE_OP = (byte)
IndexOperation.DELETE.ordinal();
protected final long resourceId;
protected final byte resourceType;
protected final IndexOperation indexOp;
@@ -81,14 +77,7 @@
txnSubsystem.getLogManager().log(logRecord);
}
- public void setOp(Operation op) throws HyracksDataException {
- switch (op) {
- case DELETE:
- logRecord.setNewOp(DELETE_OP);
- break;
- case INSERT:
- logRecord.setNewOp(INSERT_OP);
- break;
- }
+ public void setOp(byte op) throws HyracksDataException {
+ logRecord.setNewOp(op);
}
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index c627367..0747a8d 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -40,16 +40,13 @@
implements IModificationOperationCallback {
private final LSMInsertDeleteOperatorNodePushable operatorNodePushable;
- private final boolean logBeforeImage;
public PrimaryIndexModificationOperationCallback(int datasetId, int[]
primaryKeyFields, ITransactionContext txnCtx,
ILockManager lockManager, ITransactionSubsystem txnSubsystem, long
resourceId, int resourcePartition,
- byte resourceType, IndexOperation indexOp, IOperatorNodePushable
operatorNodePushable,
- boolean logBeforeImage) {
+ byte resourceType, IndexOperation indexOp, IOperatorNodePushable
operatorNodePushable) {
super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem,
resourceId, resourcePartition,
resourceType, indexOp);
this.operatorNodePushable = (LSMInsertDeleteOperatorNodePushable)
operatorNodePushable;
- this.logBeforeImage = logBeforeImage;
}
@Override
@@ -102,11 +99,7 @@
public void found(ITupleReference before, ITupleReference after) throws
HyracksDataException {
try {
int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
- if (logBeforeImage) {
- log(pkHash, after, before);
- } else {
- log(pkHash, after, null);
- }
+ log(pkHash, after, before);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index 4cbb8cf..fcd4cd5 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -46,14 +46,11 @@
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
- private final boolean logBeforeImage;
public PrimaryIndexModificationOperationCallbackFactory(JobId jobId, int
datasetId, int[] primaryKeyFields,
- ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation
indexOp, byte resourceType,
- boolean logBeforeImage) {
+ ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation
indexOp, byte resourceType) {
super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
resourceType);
this.indexOp = indexOp;
- this.logBeforeImage = logBeforeImage;
}
@Override
@@ -72,7 +69,7 @@
Resource aResource = (Resource) resource.getResource();
IModificationOperationCallback modCallback = new
PrimaryIndexModificationOperationCallback(datasetId,
primaryKeyFields, txnCtx, txnSubsystem.getLockManager(),
txnSubsystem, resource.getId(),
- aResource.partition(), resourceType, indexOp,
operatorNodePushable, logBeforeImage);
+ aResource.partition(), resourceType, indexOp,
operatorNodePushable);
txnCtx.registerIndexAndCallback(resource.getId(), index,
(AbstractOperationCallback) modCallback, true);
return modCallback;
} catch (ACIDException e) {
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
index 974e631..845e9d7 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallback.java
@@ -37,15 +37,13 @@
implements IModificationOperationCallback {
protected final IndexOperation oldOp;
- private final boolean logBeforeImage;
public SecondaryIndexModificationOperationCallback(int datasetId, int[]
primaryKeyFields,
ITransactionContext txnCtx, ILockManager lockManager,
ITransactionSubsystem txnSubsystem, long resourceId,
- int resourcePartition, byte resourceType, IndexOperation indexOp,
boolean logBeforeImage) {
+ int resourcePartition, byte resourceType, IndexOperation indexOp) {
super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem,
resourceId, resourcePartition,
resourceType, indexOp);
oldOp = (indexOp == IndexOperation.DELETE) ? IndexOperation.INSERT :
IndexOperation.DELETE;
- this.logBeforeImage = logBeforeImage;
}
@Override
@@ -57,7 +55,7 @@
public void found(ITupleReference before, ITupleReference after) throws
HyracksDataException {
try {
int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
- this.log(pkHash, after, logBeforeImage ? before : null);
+ this.log(pkHash, after, before);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 3925bba..cce3710 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -42,14 +42,11 @@
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
- private final boolean logBeforeImage;
public SecondaryIndexModificationOperationCallbackFactory(JobId jobId, int
datasetId, int[] primaryKeyFields,
- ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation
indexOp, byte resourceType,
- boolean logBeforeImage) {
+ ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation
indexOp, byte resourceType) {
super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
resourceType);
this.indexOp = indexOp;
- this.logBeforeImage = logBeforeImage;
}
@Override
@@ -68,7 +65,7 @@
Resource aResource = (Resource) resource.getResource();
IModificationOperationCallback modCallback = new
SecondaryIndexModificationOperationCallback(datasetId,
primaryKeyFields, txnCtx, txnSubsystem.getLockManager(),
txnSubsystem, resource.getId(),
- aResource.partition(), resourceType, indexOp,
logBeforeImage);
+ aResource.partition(), resourceType, indexOp);
txnCtx.registerIndexAndCallback(resource.getId(), index,
(AbstractOperationCallback) modCallback, false);
return modCallback;
} catch (ACIDException e) {
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
index 13d2d57..413ef65 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
@@ -29,14 +29,12 @@
public class UpsertOperationCallback extends
AbstractIndexModificationOperationCallback
implements IModificationOperationCallback {
- private final boolean logBeforeImage;
public UpsertOperationCallback(int datasetId, int[] primaryKeyFields,
ITransactionContext txnCtx,
ILockManager lockManager, ITransactionSubsystem txnSubsystem, long
resourceId, int resourcePartition,
- byte resourceType, IndexOperation indexOp, boolean logBeforeImage)
{
+ byte resourceType, IndexOperation indexOp) {
super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem,
resourceId, resourcePartition,
resourceType, indexOp);
- this.logBeforeImage = logBeforeImage;
}
@Override
@@ -48,7 +46,7 @@
public void found(ITupleReference before, ITupleReference after) throws
HyracksDataException {
try {
int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields);
- log(pkHash, after, logBeforeImage ? before : null);
+ log(pkHash, after, before);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index 9349e93..cd702ed 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -41,14 +41,11 @@
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
- private final boolean logBeforeImage;
public UpsertOperationCallbackFactory(JobId jobId, int datasetId, int[]
primaryKeyFields,
- ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation
indexOp, byte resourceType,
- boolean logBeforeImage) {
+ ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation
indexOp, byte resourceType) {
super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider,
resourceType);
this.indexOp = indexOp;
- this.logBeforeImage = logBeforeImage;
}
@Override
@@ -67,7 +64,7 @@
ITransactionContext txnCtx =
txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
IModificationOperationCallback modCallback = new
UpsertOperationCallback(datasetId, primaryKeyFields,
txnCtx, txnSubsystem.getLockManager(),
- txnSubsystem, resource.getId(), aResource.partition(),
resourceType, indexOp, logBeforeImage);
+ txnSubsystem, resource.getId(), aResource.partition(),
resourceType, indexOp);
txnCtx.registerIndexAndCallback(resource.getId(), index,
(AbstractOperationCallback) modCallback, true);
return modCallback;
} catch (ACIDException e) {
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 536e657..2752461 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -34,18 +34,16 @@
private final int[] primaryKeyFields;
private final boolean isTemporaryDatasetWriteJob;
private final boolean isWriteTransaction;
- private final int upsertVarIdx;
private int[] datasetPartitions;
private final boolean isSink;
public CommitRuntimeFactory(JobId jobId, int datasetId, int[]
primaryKeyFields, boolean isTemporaryDatasetWriteJob,
- boolean isWriteTransaction, int upsertVarIdx, int[]
datasetPartitions, boolean isSink) {
+ boolean isWriteTransaction, int[] datasetPartitions, boolean
isSink) {
this.jobId = jobId;
this.datasetId = datasetId;
this.primaryKeyFields = primaryKeyFields;
this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob;
this.isWriteTransaction = isWriteTransaction;
- this.upsertVarIdx = upsertVarIdx;
this.datasetPartitions = datasetPartitions;
this.isSink = isSink;
}
@@ -57,13 +55,7 @@
@Override
public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
- if (upsertVarIdx >= 0) {
- return new UpsertCommitRuntime(ctx, jobId, datasetId,
primaryKeyFields, isTemporaryDatasetWriteJob,
- isWriteTransaction,
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()],
- upsertVarIdx, isSink);
- } else {
return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields,
isTemporaryDatasetWriteJob,
isWriteTransaction,
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink);
- }
}
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
deleted file mode 100644
index 9b2fe36..0000000
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.runtime;
-
-import java.nio.ByteBuffer;
-
-import org.apache.asterix.common.transactions.JobId;
-import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.common.utils.TransactionUtil;
-import
org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-
-public class UpsertCommitRuntime extends CommitRuntime {
- private final int upsertIdx;
-
- public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int
datasetId, int[] primaryKeyFields,
- boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction,
int resourcePartition, int upsertIdx,
- boolean isSink) {
- super(ctx, jobId, datasetId, primaryKeyFields,
isTemporaryDatasetWriteJob, isWriteTransaction,
- resourcePartition, isSink);
- this.upsertIdx = upsertIdx;
- }
-
- @Override
- protected void formLogRecord(ByteBuffer buffer, int t) {
- boolean isNull =
ABooleanSerializerDeserializer.getBoolean(buffer.array(),
tAccess.getFieldSlotsLength()
- + tAccess.getTupleStartOffset(t) +
tAccess.getFieldStartOffset(t, upsertIdx) + 1);
- if (isNull) {
- // Previous record not found (insert)
- super.formLogRecord(buffer, t);
- } else {
- // Previous record found (delete + insert)
- int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
- TransactionUtil.formEntityCommitLogRecord(logRecord,
transactionContext, datasetId, pkHash, tRef,
- primaryKeyFields, resourcePartition,
LogType.UPSERT_ENTITY_COMMIT);
- }
- }
-}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 424e800..15918d1 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -255,18 +255,13 @@
LogRecord logRecord = logBufferTailReader.next();
while (logRecord != null) {
if (logRecord.getLogSource() == LogSource.LOCAL) {
- if (logRecord.getLogType() == LogType.ENTITY_COMMIT
- || logRecord.getLogType() ==
LogType.UPSERT_ENTITY_COMMIT) {
+ if (logRecord.getLogType() == LogType.ENTITY_COMMIT) {
reusableJobId.setId(logRecord.getJobId());
txnCtx =
txnSubsystem.getTransactionManager().getTransactionContext(reusableJobId,
false);
reusableDsId.setId(logRecord.getDatasetId());
txnSubsystem.getLockManager().unlock(reusableDsId,
logRecord.getPKHashValue(), LockMode.ANY,
txnCtx);
txnCtx.notifyOptracker(false);
- if (logRecord.getLogType() ==
LogType.UPSERT_ENTITY_COMMIT) {
- // since this operation consisted of delete and
insert, we need to notify the optracker twice
- txnCtx.notifyOptracker(false);
- }
if (TransactionUtil.PROFILE_MODE) {
txnSubsystem.incrementEntityCommitCount();
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index d885f00..e3ad3dd 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -48,7 +48,6 @@
if (shouldReplicate) {
switch (logRecord.getLogType()) {
case LogType.ENTITY_COMMIT:
- case LogType.UPSERT_ENTITY_COMMIT:
case LogType.UPDATE:
case LogType.FLUSH:
shouldReplicate =
replicationStrategy.isMatch(logRecord.getDatasetId());
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
index 89d043c..70c5557 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputPushRuntime.java
@@ -29,4 +29,8 @@
this.inputRecordDesc = recordDescriptor;
}
+ public RecordDescriptor getInputRecordDescriptor() {
+ return inputRecordDesc;
+ }
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
index f22c239..1684923 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IModificationOperationCallback.java
@@ -29,10 +29,6 @@
* @author zheilbron
*/
public interface IModificationOperationCallback {
- public enum Operation {
- INSERT,
- DELETE
- }
/**
* This method is called on a tuple that is about to traverse an index's
structure
@@ -60,5 +56,5 @@
* @param op
* @throws HyracksDataException
*/
- public void setOp(Operation op) throws HyracksDataException;
+ public void setOp(byte op) throws HyracksDataException;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
index e8ab8dc..98173bd 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java
@@ -61,7 +61,7 @@
}
@Override
- public void setOp(Operation op) throws HyracksDataException {
+ public void setOp(byte op) throws HyracksDataException {
// Do nothing.
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
index 43e0889..b416f5a 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java
@@ -32,5 +32,8 @@
MERGE,
FULL_MERGE,
FLUSH,
- REPLICATE
+ REPLICATE;
+ public static final byte INSERT_BYTE = (byte) INSERT.ordinal();
+ public static final byte DELETE_BYTE = (byte) DELETE.ordinal();
+ public static final byte UPSERT_BYTE = (byte) UPSERT.ordinal();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index 53b8405..473e28d 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -57,10 +57,10 @@
import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFactory;
import
org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
@@ -286,10 +286,10 @@
case PHYSICALDELETE:
case FLUSH:
case DELETE:
+ case UPSERT:
operationalComponents.add(memoryComponents.get(cmc));
break;
case INSERT:
- case UPSERT:
addOperationalMutableComponents(operationalComponents);
operationalComponents.addAll(immutableComponents);
break;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
index fecc674..3e3b0d4 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java
@@ -117,6 +117,8 @@
void forceInsert(ITupleReference tuple) throws HyracksDataException,
IndexException;
+ void forceUpsert(ITupleReference tuple) throws HyracksDataException,
IndexException;
+
void forceDelete(ITupleReference tuple) throws HyracksDataException,
IndexException;
void scheduleReplication(List<ILSMDiskComponent> diskComponents, boolean
bulkload, LSMOperationType opType)
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index 4199cfb..af62cbf 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@ -160,6 +160,12 @@
}
@Override
+ public void forceUpsert(ITupleReference tuple) throws
HyracksDataException, IndexException {
+ ctx.setOperation(IndexOperation.UPSERT);
+ lsmHarness.forceModify(ctx, tuple);
+ }
+
+ @Override
public void forceDelete(ITupleReference tuple) throws
HyracksDataException, IndexException {
ctx.setOperation(IndexOperation.DELETE);
lsmHarness.forceModify(ctx, tuple);
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
index cef4257..ff09f5b 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java
@@ -185,4 +185,9 @@
throw new UnsupportedOperationException("Cannot open inverted list
cursor on lsm inverted index.");
}
+ @Override
+ public void forceUpsert(ITupleReference tuple) throws
HyracksDataException, IndexException {
+ throw new UnsupportedOperationException("Upsert not supported by lsm
inverted index.");
+ }
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
index 6da9334..1dc3a73 100644
---
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java
@@ -100,7 +100,7 @@
}
@Override
- public void setOp(Operation op) throws HyracksDataException {
+ public void setOp(byte op) throws HyracksDataException {
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
index 15d8a60..089595f 100644
---
a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
+++
b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java
@@ -67,7 +67,7 @@
}
@Override
- public void setOp(Operation op) throws HyracksDataException {
+ public void setOp(byte op) throws HyracksDataException {
// Do nothing.
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1554
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ice5296267033cd7debe76894c864c6411f761d83
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>