Ian Maxon has submitted this change and it was merged. Change subject: [ASTERIXDB-1952][TX][IDX] Filter logs pt.2 ......................................................................
[ASTERIXDB-1952][TX][IDX] Filter logs pt.2 - user model changes: no - storage format changes: yes - interface changes: yes Details: - Add a log type specifically for filters - Only log change when filter actually widens - Stop logging of index + filter tuple during modification - Redo index and filter tuples separately via their logs Change-Id: Ie9e7795d9c8c212e8610dcb9bb5d26ec9fbbee8a Reviewed-on: https://asterix-gerrit.ics.uci.edu/1857 Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Ian Maxon <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java M hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeOpContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java A hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IExtendedModificationOperationCallback.java A hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/ExtendedIndexAccessParameters.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpIndexAccessParameters.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java M hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilter.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilterManager.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java M hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java M hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeOpContext.java M hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java M hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java 47 files changed, 575 insertions(+), 408 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; ; Verified Ian Maxon: Looks good to me, approved Objections: Jenkins: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 74277ce..4a2cf2d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -77,6 +77,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.hyracks.storage.common.IIndex; @@ -221,6 +222,7 @@ case LogType.FLUSH: case LogType.WAIT: case LogType.MARKER: + case LogType.FILTER: break; default: throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); @@ -315,59 +317,64 @@ foundWinner = true; } } - if (foundWinner) { - resourceId = logRecord.getResourceId(); - localResource = resourcesMap.get(resourceId); - /******************************************************************* - * [Notice] - * -> Issue - * Delete index may cause a problem during redo. - * The index operation to be redone couldn't be redone because the corresponding index - * may not exist in NC due to the possible index drop DDL operation. - * -> Approach - * Avoid the problem during redo. - * More specifically, the problem will be detected when the localResource of - * the corresponding index is retrieved, which will end up with 'null'. - * If null is returned, then just go and process the next - * log record. - *******************************************************************/ - if (localResource == null) { - LOGGER.log(Level.WARN, "resource was not found for resource id " + resourceId); - logRecord = logReader.next(); - continue; - } - /*******************************************************************/ + if (!foundWinner) { + break; + } + } + //fall through as FILTER is a subset of UPDATE + case LogType.FILTER: + if (partitions.contains(logRecord.getResourcePartition())) { + resourceId = logRecord.getResourceId(); + localResource = resourcesMap.get(resourceId); + /******************************************************************* + * [Notice] + * -> Issue + * Delete index may cause a problem during redo. + * The index operation to be redone couldn't be redone because the corresponding index + * may not exist in NC due to the possible index drop DDL operation. + * -> Approach + * Avoid the problem during redo. + * More specifically, the problem will be detected when the localResource of + * the corresponding index is retrieved, which will end up with 'null'. + * If null is returned, then just go and process the next + * log record. + *******************************************************************/ + if (localResource == null) { + LOGGER.log(Level.WARN, "resource was not found for resource id " + resourceId); + logRecord = logReader.next(); + continue; + } + /*******************************************************************/ - //get index instance from IndexLifeCycleManager - //if index is not registered into IndexLifeCycleManager, - //create the index using LocalMetadata stored in LocalResourceRepository - //get partition path in this node - localResourceMetadata = (DatasetLocalResource) localResource.getResource(); - index = (ILSMIndex) datasetLifecycleManager.get(localResource.getPath()); - if (index == null) { - //#. create index instance and register to indexLifeCycleManager - index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx); - datasetLifecycleManager.register(localResource.getPath(), index); - datasetLifecycleManager.open(localResource.getPath()); - try { - final DatasetResourceReference resourceReference = - DatasetResourceReference.of(localResource); - maxDiskLastLsn = - indexCheckpointManagerProvider.get(resourceReference).getLowWatermark(); - } catch (HyracksDataException e) { - datasetLifecycleManager.close(localResource.getPath()); - throw e; - } - //#. set resourceId and maxDiskLastLSN to the map - resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn); - } else { - maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId); + //get index instance from IndexLifeCycleManager + //if index is not registered into IndexLifeCycleManager, + //create the index using LocalMetadata stored in LocalResourceRepository + //get partition path in this node + localResourceMetadata = (DatasetLocalResource) localResource.getResource(); + index = (ILSMIndex) datasetLifecycleManager.get(localResource.getPath()); + if (index == null) { + //#. create index instance and register to indexLifeCycleManager + index = (ILSMIndex) localResourceMetadata.createInstance(serviceCtx); + datasetLifecycleManager.register(localResource.getPath(), index); + datasetLifecycleManager.open(localResource.getPath()); + try { + final DatasetResourceReference resourceReference = + DatasetResourceReference.of(localResource); + maxDiskLastLsn = + indexCheckpointManagerProvider.get(resourceReference).getLowWatermark(); + } catch (HyracksDataException e) { + datasetLifecycleManager.close(localResource.getPath()); + throw e; } - // lsn @ maxDiskLastLsn is either a flush log or a master replica log - if (lsn >= maxDiskLastLsn) { - redo(logRecord, datasetLifecycleManager); - redoCount++; - } + //#. set resourceId and maxDiskLastLSN to the map + resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn); + } else { + maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId); + } + // lsn @ maxDiskLastLsn is either a flush log or a master replica log + if (lsn >= maxDiskLastLsn) { + redo(logRecord, datasetLifecycleManager); + redoCount++; } } break; @@ -659,6 +666,7 @@ throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort."); case LogType.ABORT: case LogType.FLUSH: + case LogType.FILTER: case LogType.WAIT: case LogType.MARKER: //ignore @@ -741,6 +749,9 @@ // undo, upsert the old value if found, otherwise, physical delete undoUpsertOrDelete(indexAccessor, logRecord); break; + case AbstractIndexModificationOperationCallback.FILTER_BYTE: + //do nothing, can't undo filters + break; default: throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp()); } @@ -775,6 +786,9 @@ long resourceId = logRecord.getResourceId(); ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(datasetId, resourceId); ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE); + ILSMIndexOperationContext opCtx = indexAccessor.getOpContext(); + opCtx.setFilterSkip(true); + opCtx.setRecovery(true); if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.INSERT_BYTE) { indexAccessor.forceInsert(logRecord.getNewValue()); } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.DELETE_BYTE) { @@ -782,6 +796,9 @@ } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.UPSERT_BYTE) { // redo, upsert the new value indexAccessor.forceUpsert(logRecord.getNewValue()); + } else if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.FILTER_BYTE) { + opCtx.setFilterSkip(false); + indexAccessor.updateFilter(logRecord.getNewValue()); } else { throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp()); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java index e58a6fa..04f9751 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java @@ -50,121 +50,125 @@ int FLUSHING_COMPONENT_MAXID_LEN = Long.BYTES; int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + TxnId.BYTES; - int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN; + int ENTITY_RESOURCE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES; + int ENTITY_VALUE_HEADER_LEN = PKHASH_LEN + PKSZ_LEN; int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN; int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN; int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN; - int ENTITY_COMMIT_LOG_BASE_SIZE = ALL_RECORD_HEADER_LEN + ENTITYCOMMIT_UPDATE_HEADER_LEN + CHKSUM_LEN; + int ENTITY_COMMIT_LOG_BASE_SIZE = + ALL_RECORD_HEADER_LEN + ENTITY_RESOURCE_HEADER_LEN + ENTITY_VALUE_HEADER_LEN + CHKSUM_LEN; int UPDATE_LOG_BASE_SIZE = ENTITY_COMMIT_LOG_BASE_SIZE + UPDATE_LSN_HEADER + UPDATE_BODY_HEADER; + int FILTER_LOG_BASE_SIZE = + ALL_RECORD_HEADER_LEN + ENTITY_RESOURCE_HEADER_LEN + UPDATE_BODY_HEADER + UPDATE_LSN_HEADER + CHKSUM_LEN; int FLUSH_LOG_SIZE = ALL_RECORD_HEADER_LEN + DS_LEN + RS_PARTITION_LEN + FLUSHING_COMPONENT_MINID_LEN + FLUSHING_COMPONENT_MAXID_LEN + CHKSUM_LEN; int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN; int MARKER_BASE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN; - public RecordReadStatus readLogRecord(ByteBuffer buffer); + RecordReadStatus readLogRecord(ByteBuffer buffer); - public void writeLogRecord(ByteBuffer buffer); + void writeLogRecord(ByteBuffer buffer); - public ITransactionContext getTxnCtx(); + ITransactionContext getTxnCtx(); - public void setTxnCtx(ITransactionContext txnCtx); + void setTxnCtx(ITransactionContext txnCtx); - public boolean isFlushed(); + boolean isFlushed(); - public void isFlushed(boolean isFlushed); + void isFlushed(boolean isFlushed); - public byte getLogType(); + byte getLogType(); - public void setLogType(byte logType); + void setLogType(byte logType); long getTxnId(); void setTxnId(long jobId); - public int getDatasetId(); + int getDatasetId(); - public void setDatasetId(int datasetId); + void setDatasetId(int datasetId); - public int getPKHashValue(); + int getPKHashValue(); - public void setPKHashValue(int PKHashValue); + void setPKHashValue(int PKHashValue); - public long getResourceId(); + long getResourceId(); - public void setResourceId(long resourceId); + void setResourceId(long resourceId); - public int getLogSize(); + int getLogSize(); - public void setLogSize(int logSize); + void setLogSize(int logSize); - public byte getNewOp(); + byte getNewOp(); - public void setNewOp(byte newOp); + void setNewOp(byte newOp); - public void setNewValueSize(int newValueSize); + void setNewValueSize(int newValueSize); - public ITupleReference getNewValue(); + ITupleReference getNewValue(); - public void setNewValue(ITupleReference newValue); + void setNewValue(ITupleReference newValue); - public long getChecksum(); + long getChecksum(); - public void setChecksum(long checksum); + void setChecksum(long checksum); - public long getLSN(); + long getLSN(); - public void setLSN(long LSN); + void setLSN(long LSN); - public String getLogRecordForDisplay(); + String getLogRecordForDisplay(); - public void computeAndSetLogSize(); + void computeAndSetLogSize(); - public int getPKValueSize(); + int getPKValueSize(); - public ITupleReference getPKValue(); + ITupleReference getPKValue(); - public void setPKFields(int[] primaryKeyFields); + void setPKFields(int[] primaryKeyFields); - public void computeAndSetPKValueSize(); + void computeAndSetPKValueSize(); - public void setPKValue(ITupleReference PKValue); + void setPKValue(ITupleReference PKValue); - public void readRemoteLog(ByteBuffer buffer); + void readRemoteLog(ByteBuffer buffer); - public void setLogSource(byte logSource); + void setLogSource(byte logSource); - public byte getLogSource(); + byte getLogSource(); - public int getRemoteLogSize(); + int getRemoteLogSize(); - public int getResourcePartition(); + int getResourcePartition(); - public void setResourcePartition(int resourcePartition); + void setResourcePartition(int resourcePartition); - public void setReplicated(boolean replicated); + void setReplicated(boolean replicated); /** * @return a flag indicating whether the log was replicated */ - public boolean isReplicated(); + boolean isReplicated(); - public void writeRemoteLogRecord(ByteBuffer buffer); + void writeRemoteLogRecord(ByteBuffer buffer); - public ITupleReference getOldValue(); + ITupleReference getOldValue(); - public void setOldValue(ITupleReference tupleBefore); + void setOldValue(ITupleReference tupleBefore); - public void setOldValueSize(int beforeSize); + void setOldValueSize(int beforeSize); - public boolean isMarker(); + boolean isMarker(); - public ByteBuffer getMarker(); + ByteBuffer getMarker(); - public void logAppended(long lsn); + void logAppended(long lsn); - public long getPreviousMarkerLSN(); + long getPreviousMarkerLSN(); /** * Sets flag indicating if this log should be replicated or not diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java index d85fd70..5fdb4e2 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java @@ -24,6 +24,7 @@ import java.util.zip.CRC32; import org.apache.asterix.common.context.PrimaryIndexOperationTracker; +import org.apache.commons.lang3.ArrayUtils; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference; import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter; @@ -36,18 +37,20 @@ * LogType(1) * TxnId(8) * --------------------------- - * [Header2] (16 bytes + PKValueSize) : for entity_commit, upsert_entity_commit, and update log types + * [Header2] (8 bytes) : for entity_commit, upsert_entity_commit, filter and update log types * DatasetId(4) //stored in dataset_dataset in Metadata Node * ResourcePartition(4) + * --------------------------- + * [Header3] (8 bytes + PKValueSize) : for entity_commit, upsert_entity_commit, and update log types * PKHashValue(4) * PKValueSize(4) * PKValue(PKValueSize) * --------------------------- - * [Header3] (12 bytes) : only for update log type + * [Header4] (12 bytes) : only for update, filter log type * ResourceId(8) //stored in .metadata of the corresponding index in NC node * LogRecordSize(4) * --------------------------- - * [Body] (9 bytes + NewValueSize) : only for update log type + * [Body] (9 bytes + NewValueSize) : only for update, filter log type * FieldCnt(4) * NewOp(1) * NewValueSize(4) @@ -57,7 +60,6 @@ * Checksum(8) * --------------------------- */ - public class LogRecord implements ILogRecord { // ------------- fields in a log record (begin) ------------// @@ -125,10 +127,12 @@ buffer.putLong(txnId); switch (logType) { case LogType.ENTITY_COMMIT: - writeEntityInfo(buffer); + writeEntityResource(buffer); + writeEntityValue(buffer); break; case LogType.UPDATE: - writeEntityInfo(buffer); + writeEntityResource(buffer); + writeEntityValue(buffer); buffer.putLong(resourceId); buffer.putInt(logSize); buffer.putInt(newValueFieldCount); @@ -140,6 +144,15 @@ buffer.putInt(oldValueFieldCount); writeTuple(buffer, oldValue, oldValueSize); } + break; + case LogType.FILTER: + writeEntityResource(buffer); + buffer.putLong(resourceId); + buffer.putInt(logSize); + buffer.putInt(newValueFieldCount); + buffer.put(newOp); + buffer.putInt(newValueSize); + writeTuple(buffer, newValue, newValueSize); break; case LogType.FLUSH: buffer.putInt(datasetId); @@ -159,15 +172,18 @@ } } - private void writeEntityInfo(ByteBuffer buffer) { - buffer.putInt(resourcePartition); - buffer.putInt(datasetId); + private void writeEntityValue(ByteBuffer buffer) { buffer.putInt(PKHashValue); if (PKValueSize <= 0) { throw new IllegalStateException("Primary Key Size is less than or equal to 0"); } buffer.putInt(PKValueSize); writePKValue(buffer); + } + + private void writeEntityResource(ByteBuffer buffer) { + buffer.putInt(resourcePartition); + buffer.putInt(datasetId); } @Override @@ -264,51 +280,18 @@ computeAndSetLogSize(); break; case LogType.ENTITY_COMMIT: - if (readEntityInfo(buffer)) { + if (readEntityResource(buffer) && readEntityValue(buffer)) { computeAndSetLogSize(); } else { return RecordReadStatus.TRUNCATED; } break; case LogType.UPDATE: - if (readEntityInfo(buffer)) { - if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) { - return RecordReadStatus.TRUNCATED; - } - resourceId = buffer.getLong(); - logSize = buffer.getInt(); - newValueFieldCount = buffer.getInt(); - newOp = buffer.get(); - newValueSize = buffer.getInt(); - if (buffer.remaining() < newValueSize) { - if (logSize > buffer.capacity()) { - return RecordReadStatus.LARGE_RECORD; - } - return RecordReadStatus.TRUNCATED; - } - newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize); - if (logSize > getUpdateLogSizeWithoutOldValue()) { - // Prev Image exists - if (buffer.remaining() < Integer.BYTES) { - return RecordReadStatus.TRUNCATED; - } - oldValueSize = buffer.getInt(); - if (buffer.remaining() < Integer.BYTES) { - return RecordReadStatus.TRUNCATED; - } - oldValueFieldCount = buffer.getInt(); - if (buffer.remaining() < oldValueSize) { - return RecordReadStatus.TRUNCATED; - } - oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize); - } else { - oldValueSize = 0; - oldValue = null; - } + if (readEntityResource(buffer) && readEntityValue(buffer)) { + return readUpdateInfo(buffer); } else { return RecordReadStatus.TRUNCATED; } - break; case LogType.MARKER: if (buffer.remaining() < DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN) { return RecordReadStatus.TRUNCATED; @@ -331,19 +314,23 @@ marker.position(lenRemaining); marker.flip(); break; + case LogType.FILTER: + if (readEntityResource(buffer)) { + return readUpdateInfo(buffer); + } else { + return RecordReadStatus.TRUNCATED; + } default: break; } return RecordReadStatus.OK; } - private boolean readEntityInfo(ByteBuffer buffer) { + private boolean readEntityValue(ByteBuffer buffer) { //attempt to read in the resourcePartition, dsid, PK hash and PK length - if (buffer.remaining() < ENTITYCOMMIT_UPDATE_HEADER_LEN) { + if (buffer.remaining() < ENTITY_VALUE_HEADER_LEN) { return false; } - resourcePartition = buffer.getInt(); - datasetId = buffer.getInt(); PKHashValue = buffer.getInt(); PKValueSize = buffer.getInt(); // attempt to read in the PK @@ -355,6 +342,53 @@ } PKValue = readPKValue(buffer); return true; + } + + private boolean readEntityResource(ByteBuffer buffer) { + //attempt to read in the resourcePartition and dsid + if (buffer.remaining() < ENTITY_RESOURCE_HEADER_LEN) { + return false; + } + resourcePartition = buffer.getInt(); + datasetId = buffer.getInt(); + return true; + } + + private RecordReadStatus readUpdateInfo(ByteBuffer buffer) { + if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) { + return RecordReadStatus.TRUNCATED; + } + resourceId = buffer.getLong(); + logSize = buffer.getInt(); + newValueFieldCount = buffer.getInt(); + newOp = buffer.get(); + newValueSize = buffer.getInt(); + if (buffer.remaining() < newValueSize) { + if (logSize > buffer.capacity()) { + return RecordReadStatus.LARGE_RECORD; + } + return RecordReadStatus.TRUNCATED; + } + newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize); + if (logSize > getUpdateLogSizeWithoutOldValue()) { + // Prev Image exists + if (buffer.remaining() < Integer.BYTES) { + return RecordReadStatus.TRUNCATED; + } + oldValueSize = buffer.getInt(); + if (buffer.remaining() < Integer.BYTES) { + return RecordReadStatus.TRUNCATED; + } + oldValueFieldCount = buffer.getInt(); + if (buffer.remaining() < oldValueSize) { + return RecordReadStatus.TRUNCATED; + } + oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize); + } else { + oldValueSize = 0; + oldValue = null; + } + return RecordReadStatus.OK; } @Override @@ -403,6 +437,10 @@ } } + private int getFilterLogSize() { + return FILTER_LOG_BASE_SIZE + newValueSize; + } + private int getUpdateLogSizeWithoutOldValue() { return UPDATE_LOG_BASE_SIZE + PKValueSize + newValueSize; } @@ -425,6 +463,9 @@ break; case LogType.WAIT: logSize = WAIT_LOG_SIZE; + break; + case LogType.FILTER: + logSize = getFilterLogSize(); break; case LogType.MARKER: setMarkerLogSize(); @@ -499,8 +540,8 @@ } @Override - public void setTxnId(long jobId) { - this.txnId = jobId; + public void setTxnId(long txnId) { + this.txnId = txnId; } @Override diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java index 11c45ad..f02b0de 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java @@ -26,7 +26,8 @@ public static final byte ABORT = 3; public static final byte FLUSH = 4; public static final byte WAIT = 6; - public static final byte MARKER = 7; + public static final byte FILTER = 7; + public static final byte MARKER = 8; private static final String STRING_UPDATE = "UPDATE"; private static final String STRING_JOB_COMMIT = "JOB_COMMIT"; @@ -34,6 +35,7 @@ private static final String STRING_ABORT = "ABORT"; private static final String STRING_FLUSH = "FLUSH"; private static final String STRING_WAIT = "WAIT"; + private static final String STRING_FILTER = "FILTER"; private static final String STRING_MARKER = "MARKER"; private static final String STRING_UNKNOWN_LOG_TYPE = "UNKNOWN_LOG_TYPE"; @@ -51,6 +53,8 @@ return STRING_FLUSH; case LogType.WAIT: return STRING_WAIT; + case LogType.FILTER: + return STRING_FILTER; case LogType.MARKER: return STRING_MARKER; default: diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java index 19536f6..9b749fa 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexOperationContext.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; @@ -45,6 +46,8 @@ private IndexOperation op; private LSMIOOperationType ioOperationType; private ILSMDiskComponent newComponent; + private boolean filterSkip = false; + private boolean isRecovery = false; public TestLSMIndexOperationContext(ILSMIndex index) { this.index = index; @@ -89,7 +92,7 @@ } @Override - public IModificationOperationCallback getModificationCallback() { + public IExtendedModificationOperationCallback getModificationCallback() { return NoOpOperationCallback.INSTANCE; } @@ -156,6 +159,27 @@ } @Override + public boolean isFilterSkipped() { + return filterSkip; + } + + @Override + public void setFilterSkip(boolean skip) { + this.filterSkip = skip; + } + + @Override + public boolean isRecovery() { + return isRecovery; + } + + @Override + public void setRecovery(boolean recovery) { + this.isRecovery = recovery; + + } + + @Override public LSMIOOperationType getIoOperationType() { return ioOperationType; } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java index 6189e37..b094d9e 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsProcessor.java @@ -53,6 +53,7 @@ switch (reusableLog.getLogType()) { case LogType.UPDATE: case LogType.ENTITY_COMMIT: + case LogType.FILTER: logManager.log(reusableLog); break; case LogType.JOB_COMMIT: diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java index 3da9e83..8746fba 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/AbstractIndexModificationOperationCallback.java @@ -29,20 +29,24 @@ import org.apache.asterix.common.transactions.LogType; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter; import org.apache.hyracks.storage.common.IModificationOperationCallback; public abstract class AbstractIndexModificationOperationCallback extends AbstractOperationCallback - implements IModificationOperationCallback { + implements IExtendedModificationOperationCallback { public static final byte INSERT_BYTE = 0x01; public static final byte DELETE_BYTE = 0x02; public static final byte UPSERT_BYTE = 0x03; + public static final byte FILTER_BYTE = 0x04; public enum Operation { INSERT(INSERT_BYTE), DELETE(DELETE_BYTE), - UPSERT(UPSERT_BYTE); + UPSERT(UPSERT_BYTE), + FILTER_MOD(FILTER_BYTE); + private byte value; Operation(byte value) { @@ -59,6 +63,8 @@ return DELETE; case INSERT: return INSERT; + case FILTER_MOD: + return FILTER_MOD; case UPSERT: return UPSERT; default: @@ -71,7 +77,8 @@ protected final byte resourceType; protected final Operation indexOp; protected final ITransactionSubsystem txnSubsystem; - protected final ILogRecord logRecord; + protected final ILogRecord indexRecord; + protected final ILogRecord filterRecord; protected AbstractIndexModificationOperationCallback(DatasetId datasetId, int[] primaryKeyFields, ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, @@ -80,35 +87,52 @@ this.resourceType = resourceType; this.indexOp = indexOp; this.txnSubsystem = txnSubsystem; - logRecord = new LogRecord(); - logRecord.setTxnCtx(txnCtx); - logRecord.setLogType(LogType.UPDATE); - logRecord.setTxnId(txnCtx.getTxnId().getId()); - logRecord.setDatasetId(datasetId.getId()); - logRecord.setResourceId(resourceId); - logRecord.setResourcePartition(resourcePartition); - logRecord.setNewOp(indexOp.value()); + indexRecord = new LogRecord(); + indexRecord.setTxnCtx(txnCtx); + indexRecord.setLogType(LogType.UPDATE); + indexRecord.setTxnId(txnCtx.getTxnId().getId()); + indexRecord.setDatasetId(datasetId.getId()); + indexRecord.setResourceId(resourceId); + indexRecord.setResourcePartition(resourcePartition); + indexRecord.setNewOp(indexOp.value()); + filterRecord = new LogRecord(); + filterRecord.setTxnCtx(txnCtx); + filterRecord.setLogType(LogType.FILTER); + filterRecord.setDatasetId(datasetId.getId()); + filterRecord.setTxnId(txnCtx.getTxnId().getId()); + filterRecord.setResourceId(resourceId); + filterRecord.setResourcePartition(resourcePartition); + filterRecord.setNewOp(Operation.FILTER_MOD.value()); } protected void log(int PKHash, ITupleReference newValue, ITupleReference oldValue) throws ACIDException { - logRecord.setPKHashValue(PKHash); - logRecord.setPKFields(primaryKeyFields); - logRecord.setPKValue(newValue); - logRecord.computeAndSetPKValueSize(); + indexRecord.setPKHashValue(PKHash); + indexRecord.setPKFields(primaryKeyFields); + indexRecord.setPKValue(newValue); + indexRecord.computeAndSetPKValueSize(); if (newValue != null) { - logRecord.setNewValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(newValue)); - logRecord.setNewValue(newValue); + indexRecord.setNewValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(newValue)); + indexRecord.setNewValue(newValue); } else { - logRecord.setNewValueSize(0); + indexRecord.setNewValueSize(0); } if (oldValue != null) { - logRecord.setOldValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(oldValue)); - logRecord.setOldValue(oldValue); + indexRecord.setOldValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(oldValue)); + indexRecord.setOldValue(oldValue); } else { - logRecord.setOldValueSize(0); + indexRecord.setOldValueSize(0); } - logRecord.computeAndSetLogSize(); - txnSubsystem.getLogManager().log(logRecord); + indexRecord.computeAndSetLogSize(); + txnSubsystem.getLogManager().log(indexRecord); + } + + public void after(ITupleReference newValue) throws HyracksDataException { + if (newValue != null) { + filterRecord.setNewValueSize(SimpleTupleWriter.INSTANCE.bytesRequired(newValue)); + filterRecord.setNewValue(newValue); + filterRecord.computeAndSetLogSize(); + txnSubsystem.getLogManager().log(filterRecord); + } } /** @@ -116,9 +140,8 @@ * a single operator to perform different operations per tuple * * @param op - * @throws HyracksDataException */ - public void setOp(Operation op) throws HyracksDataException { - logRecord.setNewOp(op.value()); + public void setOp(Operation op) { + indexRecord.setNewOp(op.value()); } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java index 2c8079d..3e41264 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java @@ -104,10 +104,10 @@ } private void logWait() throws ACIDException { - logRecord.setLogType(LogType.WAIT); - logRecord.computeAndSetLogSize(); - txnSubsystem.getLogManager().log(logRecord); + indexRecord.setLogType(LogType.WAIT); + indexRecord.computeAndSetLogSize(); + txnSubsystem.getLogManager().log(indexRecord); // set the log type back to UPDATE for normal updates - logRecord.setLogType(LogType.UPDATE); + indexRecord.setLogType(LogType.UPDATE); } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java index a1aec1a..1e13883 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java @@ -48,6 +48,7 @@ case LogType.ENTITY_COMMIT: case LogType.UPDATE: case LogType.FLUSH: + case LogType.FILTER: shouldReplicate = replicationStrategy.isMatch(logRecord.getDatasetId()); if (shouldReplicate && !replicatedTxn.contains(logRecord.getTxnId())) { replicatedTxn.add(logRecord.getTxnId()); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java index 077a006..fb8770e 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java @@ -300,7 +300,7 @@ } // fall-through case SUFFICIENT_CONTIGUOUS_SPACE: { - foundModCallback(ctx, null, tuple); + ctx.getModificationCallback().found(null, tuple); ctx.getLeafFrame().insert(tuple, targetTupleIndex); ctx.getSplitKey().reset(); break; @@ -308,7 +308,7 @@ case SUFFICIENT_SPACE: { int finalIndex = ctx.getLeafFrame().compact() ? ctx.getLeafFrame().findInsertTupleIndex(tuple) : targetTupleIndex; - foundModCallback(ctx, null, tuple); + ctx.getModificationCallback().found(null, tuple); ctx.getLeafFrame().insert(tuple, finalIndex); ctx.getSplitKey().reset(); break; @@ -317,7 +317,7 @@ // Try compressing the page first and see if there is space available. if (ctx.getLeafFrame().compress() && ctx.getLeafFrame().hasSpaceInsert(tuple) == FrameOpSpaceStatus.SUFFICIENT_CONTIGUOUS_SPACE) { - foundModCallback(ctx, null, tuple); + ctx.getModificationCallback().found(null, tuple); ctx.getLeafFrame().insert(tuple, ctx.getLeafFrame().findInsertTupleIndex(tuple)); ctx.getSplitKey().reset(); } else { @@ -360,10 +360,10 @@ // Perform an update (delete + insert) if the updateTupleIndex != -1 if (updateTupleIndex != -1) { ITupleReference beforeTuple = ctx.getLeafFrame().getMatchingKeyTuple(tuple, updateTupleIndex); - foundModCallback(ctx, beforeTuple, tuple); + ctx.getModificationCallback().found(beforeTuple, tuple); ctx.getLeafFrame().delete(tuple, updateTupleIndex); } else { - foundModCallback(ctx, null, tuple); + ctx.getModificationCallback().found(null, tuple); } ctx.getLeafFrame().split(rightFrame, tuple, ctx.getSplitKey(), ctx, bufferCache); @@ -398,7 +398,7 @@ boolean restartOp = false; switch (spaceStatus) { case SUFFICIENT_INPLACE_SPACE: { - foundModCallback(ctx, beforeTuple, tuple); + ctx.getModificationCallback().found(beforeTuple, tuple); ctx.getLeafFrame().update(tuple, oldTupleIndex, true); ctx.getSplitKey().reset(); break; @@ -407,7 +407,7 @@ // TODO: avoid repeated calculation of tuple size // TODO: in-place update on expand // Delete the old tuple, compact the frame, and insert the new tuple. - foundModCallback(ctx, beforeTuple, tuple); + ctx.getModificationCallback().found(beforeTuple, tuple); ctx.getLeafFrame().delete(tuple, oldTupleIndex); ctx.getLeafFrame().compact(); ctx.getLeafFrame().ensureCapacity(bufferCache, tuple, ctx); @@ -417,14 +417,14 @@ break; } case SUFFICIENT_CONTIGUOUS_SPACE: { - foundModCallback(ctx, beforeTuple, tuple); + ctx.getModificationCallback().found(beforeTuple, tuple); ctx.getLeafFrame().update(tuple, oldTupleIndex, false); ctx.getSplitKey().reset(); break; } case SUFFICIENT_SPACE: { // Delete the old tuple, compact the frame, and insert the new tuple. - foundModCallback(ctx, beforeTuple, tuple); + ctx.getModificationCallback().found(beforeTuple, tuple); ctx.getLeafFrame().delete(tuple, oldTupleIndex); ctx.getLeafFrame().compact(); int targetTupleIndex = ctx.getLeafFrame().findInsertTupleIndex(tuple); @@ -759,12 +759,6 @@ modificationCallback, searchCallback); } - private BTreeOpContext createOpContext(IIndexAccessor accessor, IModificationOperationCallback modificationCallback, - ISearchOperationCallback searchCallback, int[] logTupleFields) { - return new BTreeOpContext(accessor, leafFrameFactory, interiorFrameFactory, freePageManager, cmpFactories, - modificationCallback, searchCallback, logTupleFields); - } - @SuppressWarnings("rawtypes") public String printTree(IBTreeLeafFrame leafFrame, IBTreeInteriorFrame interiorFrame, ISerializerDeserializer[] keySerdes) throws Exception { @@ -824,11 +818,6 @@ return new BTreeAccessor(this, iap.getModificationCallback(), iap.getSearchOperationCallback()); } - public BTreeAccessor createAccessor(IModificationOperationCallback modificationCallback, - ISearchOperationCallback searchCallback, int[] logTupleFields) { - return new BTreeAccessor(this, modificationCallback, searchCallback, logTupleFields); - } - // TODO: Class should be private. But currently we need to expose the // setOpContext() API to the LSM Tree for it to work correctly. @@ -847,12 +836,6 @@ ISearchOperationCallback searchCallback) { this.btree = btree; this.ctx = btree.createOpContext(this, modificationCalback, searchCallback); - } - - public BTreeAccessor(BTree btree, IModificationOperationCallback modificationCalback, - ISearchOperationCallback searchCallback, int[] logTupleFields) { - this.btree = btree; - this.ctx = btree.createOpContext(this, modificationCalback, searchCallback, logTupleFields); } public void reset(BTree btree, IModificationOperationCallback modificationCallback, @@ -1251,14 +1234,4 @@ public int getNumOfFilterFields() { return 0; } - - private void foundModCallback(BTreeOpContext ctx, ITupleReference before, ITupleReference after) - throws HyracksDataException { - if (ctx.getTupleWithNonIndexFields() == null) { - ctx.getModificationCallback().found(before, after); - } else { - ctx.getModificationCallback().found(before, ctx.getTupleWithNonIndexFields()); - } - } - } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeOpContext.java index 96370fe..c082ad7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTreeOpContext.java @@ -39,7 +39,6 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame; import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; -import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.common.IIndexAccessor; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; @@ -57,7 +56,6 @@ private final IBTreeInteriorFrame interiorFrame; private final IPageManager freePageManager; private final ITreeIndexMetadataFrame metaFrame; - private PermutingTupleReference tupleWithNonIndexFields; // Optional, for filtered LSM Index transaction support private ITreeIndexFrameFactory leafFrameFactory; private IBTreeLeafFrame leafFrame; private IndexOperation op; @@ -113,15 +111,6 @@ // Debug this.validationInfos = new ArrayDeque<>(INIT_ARRAYLIST_SIZE); this.interiorFrameTuple = getInteriorFrame().createTupleReference(); - } - - public BTreeOpContext(IIndexAccessor accessor, ITreeIndexFrameFactory leafFrameFactory, - ITreeIndexFrameFactory interiorFrameFactory, IPageManager freePageManager, - IBinaryComparatorFactory[] cmpFactories, IModificationOperationCallback modificationCallback, - ISearchOperationCallback searchCallback, int[] nonIndexFields) { - this(accessor, leafFrameFactory, interiorFrameFactory, freePageManager, cmpFactories, modificationCallback, - searchCallback); - this.tupleWithNonIndexFields = new PermutingTupleReference(nonIndexFields); } @Override @@ -376,14 +365,6 @@ public void setLeafFrameFactory(ITreeIndexFrameFactory leafFrameFactory) { this.leafFrameFactory = leafFrameFactory; - } - - public ITupleReference getTupleWithNonIndexFields() { - return tupleWithNonIndexFields; - } - - public void resetNonIndexFieldsTuple(ITupleReference newValue) { - tupleWithNonIndexFields.reset(newValue); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java index eee43b5..523ed9b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/DiskBTree.java @@ -198,22 +198,11 @@ return new DiskBTreeAccessor(this, iap.getModificationCallback(), iap.getSearchOperationCallback()); } - @Override - public DiskBTreeAccessor createAccessor(IModificationOperationCallback modificationCallback, - ISearchOperationCallback searchCallback, int[] logTupleFields) { - return new DiskBTreeAccessor(this, modificationCallback, searchCallback, logTupleFields); - } - public class DiskBTreeAccessor extends BTreeAccessor { public DiskBTreeAccessor(DiskBTree btree, IModificationOperationCallback modificationCalback, ISearchOperationCallback searchCallback) { super(btree, modificationCalback, searchCallback); - } - - public DiskBTreeAccessor(DiskBTree btree, IModificationOperationCallback modificationCalback, - ISearchOperationCallback searchCallback, int[] logTupleFields) { - super(btree, modificationCalback, searchCallback, logTupleFields); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IExtendedModificationOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IExtendedModificationOperationCallback.java new file mode 100644 index 0000000..7bb4e82 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/IExtendedModificationOperationCallback.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.common.api; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.common.IModificationOperationCallback; + +public interface IExtendedModificationOperationCallback extends IModificationOperationCallback { + /** + * Called after the action taken in found, to take action on a tuple that is not part of the index + * itself but is part of an ancillary structure that is updated alongside the index. An example would + * be a simple statistic on the index that records the minimum and maximum values. + * + * @param after + * The tuple to feed to the ancilliary structure + * @throws HyracksDataException + */ + + void after(ITupleReference after) throws HyracksDataException; +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/ExtendedIndexAccessParameters.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/ExtendedIndexAccessParameters.java new file mode 100644 index 0000000..dbefe74 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/ExtendedIndexAccessParameters.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.common.impls; + +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; +import org.apache.hyracks.storage.common.IIndex; +import org.apache.hyracks.storage.common.IIndexAccessParameters; +import org.apache.hyracks.storage.common.ISearchOperationCallback; + +public class ExtendedIndexAccessParameters extends IndexAccessParameters implements IIndexAccessParameters { + + protected final IExtendedModificationOperationCallback extendedModificationCallback; + // This map is used to put additional parameters to an index accessor. + + public ExtendedIndexAccessParameters(IExtendedModificationOperationCallback extendedModificationCallback, + ISearchOperationCallback searchOperationCallback) { + super(extendedModificationCallback, searchOperationCallback); + this.extendedModificationCallback = extendedModificationCallback; + } + + @Override + public IExtendedModificationOperationCallback getModificationCallback() { + return extendedModificationCallback; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpIndexAccessParameters.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpIndexAccessParameters.java index 2edea70..0068f4f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpIndexAccessParameters.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpIndexAccessParameters.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.Map; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.common.IIndexAccessParameters; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; @@ -34,7 +35,7 @@ } @Override - public IModificationOperationCallback getModificationCallback() { + public IExtendedModificationOperationCallback getModificationCallback() { return NoOpOperationCallback.INSTANCE; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java index 15aba57..85417f2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/NoOpOperationCallback.java @@ -21,13 +21,15 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; /** * Dummy operation callback that simply does nothing. */ -public enum NoOpOperationCallback implements IModificationOperationCallback, ISearchOperationCallback { +public enum NoOpOperationCallback + implements IModificationOperationCallback, ISearchOperationCallback, IExtendedModificationOperationCallback { INSTANCE; @Override @@ -59,4 +61,9 @@ public void complete(ITupleReference tuple) throws HyracksDataException { // Do nothing. } + + @Override + public void after(ITupleReference tuple) throws HyracksDataException { + //Do nothing. + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java index ff47d27..636e4f5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/ophelpers/IndexOperation.java @@ -26,6 +26,7 @@ UPDATE, UPSERT, SEARCH, + FILTER_MOD, DISKORDERSCAN, PHYSICALDELETE, NOOP, diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java index 1c74275..7f53ed5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/ExternalBTreeWithBuddyOpContext.java @@ -20,6 +20,7 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; @@ -65,7 +66,7 @@ // This should never be needed for disk only indexes @Override - public IModificationOperationCallback getModificationCallback() { + public IExtendedModificationOperationCallback getModificationCallback() { return null; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java index 4578eb3..41a11e6 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java @@ -32,11 +32,13 @@ import org.apache.hyracks.storage.am.btree.impls.BTree; import org.apache.hyracks.storage.am.btree.impls.BTree.BTreeAccessor; import org.apache.hyracks.storage.am.btree.impls.RangePredicate; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.api.IIndexOperationContext; import org.apache.hyracks.storage.am.common.api.IPageManager; import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.AbstractLSMWithBloomFilterDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper; @@ -151,7 +153,6 @@ if (ctx.getIndexTuple() != null) { ctx.getIndexTuple().reset(tuple); indexTuple = ctx.getIndexTuple(); - ctx.getCurrentMutableBTreeAccessor().getOpContext().resetNonIndexFieldsTuple(tuple); } else { indexTuple = tuple; } @@ -303,7 +304,8 @@ List<ITupleReference> filterTuples = new ArrayList<>(); filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple()); filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple()); - getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples); + getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples, + NoOpOperationCallback.INSTANCE); getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); } // Write metadata from memory component to disk @@ -353,7 +355,8 @@ filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple()); filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple()); } - getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples); + getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples, + NoOpOperationCallback.INSTANCE); getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getMetadataHolder()); } @@ -396,8 +399,9 @@ int numBloomFilterKeyFields = hasBloomFilter ? ((LSMBTreeWithBloomFilterDiskComponentFactory) componentFactory).getBloomFilterKeyFields().length : 0; return new LSMBTreeOpContext(this, memoryComponents, insertLeafFrameFactory, deleteLeafFrameFactory, - iap.getModificationCallback(), iap.getSearchOperationCallback(), numBloomFilterKeyFields, - getTreeFields(), getFilterFields(), getHarness(), getFilterCmpFactories(), tracer); + (IExtendedModificationOperationCallback) iap.getModificationCallback(), + iap.getSearchOperationCallback(), numBloomFilterKeyFields, getTreeFields(), getFilterFields(), + getHarness(), getFilterCmpFactories(), tracer); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java index 7969ba3..1cfc414 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeOpContext.java @@ -29,8 +29,10 @@ import org.apache.hyracks.storage.am.btree.impls.BTreeOpContext; import org.apache.hyracks.storage.am.btree.impls.BTreeRangeSearchCursor; import org.apache.hyracks.storage.am.btree.impls.RangePredicate; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters; +import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; @@ -69,7 +71,7 @@ public LSMBTreeOpContext(ILSMIndex index, List<ILSMMemoryComponent> mutableComponents, ITreeIndexFrameFactory insertLeafFrameFactory, ITreeIndexFrameFactory deleteLeafFrameFactory, - IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback, + IExtendedModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback, int numBloomFilterKeyFields, int[] btreeFields, int[] filterFields, ILSMHarness lsmHarness, IBinaryComparatorFactory[] filterCmpFactories, ITracer tracer) { super(index, btreeFields, filterFields, filterCmpFactories, searchCallback, modificationCallback, tracer); @@ -90,14 +92,9 @@ for (int i = 0; i < mutableComponents.size(); i++) { LSMBTreeMemoryComponent mutableComponent = (LSMBTreeMemoryComponent) mutableComponents.get(i); mutableBTrees[i] = mutableComponent.getIndex(); - if (allFields != null) { - mutableBTreeAccessors[i] = mutableBTrees[i].createAccessor(modificationCallback, - NoOpOperationCallback.INSTANCE, allFields); - } else { - IIndexAccessParameters iap = - new IndexAccessParameters(modificationCallback, NoOpOperationCallback.INSTANCE); - mutableBTreeAccessors[i] = mutableBTrees[i].createAccessor(iap); - } + IIndexAccessParameters iap = + new IndexAccessParameters(modificationCallback, NoOpOperationCallback.INSTANCE); + mutableBTreeAccessors[i] = mutableBTrees[i].createAccessor(iap); mutableBTreeOpCtxs[i] = mutableBTreeAccessors[i].getOpContext(); } this.insertLeafFrameFactory = insertLeafFrameFactory; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilter.java index b92e0d2..34f8855 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilter.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilter.java @@ -21,14 +21,16 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.common.MultiComparator; public interface ILSMComponentFilter { - void update(ITupleReference tuple, MultiComparator cmp) throws HyracksDataException; - boolean satisfy(ITupleReference min, ITupleReference max, MultiComparator cmp) throws HyracksDataException; + void update(ITupleReference tuple, MultiComparator cmp, IExtendedModificationOperationCallback opCallback) + throws HyracksDataException; + ITupleReference getMinTuple(); ITupleReference getMaxTuple(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilterManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilterManager.java index f310b4e..64b0c44 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilterManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentFilterManager.java @@ -22,11 +22,13 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.api.ITreeIndex; public interface ILSMComponentFilterManager { - void updateFilter(ILSMComponentFilter filter, List<ITupleReference> filterTuples) throws HyracksDataException; + void updateFilter(ILSMComponentFilter filter, List<ITupleReference> filterTuples, + IExtendedModificationOperationCallback opCallback) throws HyracksDataException; boolean readFilter(ILSMComponentFilter filter, ITreeIndex index) throws HyracksDataException; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java index 0e1a5e4..61ef6cf 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexAccessor.java @@ -263,4 +263,11 @@ * @throws HyracksDataException */ void deleteComponents(Predicate<ILSMComponent> predicate) throws HyracksDataException; + + /** + * Update the filter of an LSM index + * @param tuple + * @throws HyracksDataException + */ + void updateFilter(ITupleReference tuple) throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java index 79b3262..b34b403 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndexOperationContext.java @@ -20,6 +20,7 @@ import java.util.List; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.api.IIndexOperationContext; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; @@ -35,7 +36,7 @@ ISearchOperationCallback getSearchOperationCallback(); - IModificationOperationCallback getModificationCallback(); + IExtendedModificationOperationCallback getModificationCallback(); void setCurrentMutableComponentId(int currentMutableComponentId); @@ -84,6 +85,14 @@ */ boolean isTracingEnabled(); + boolean isFilterSkipped(); + + void setFilterSkip(boolean skip); + + boolean isRecovery(); + + void setRecovery(boolean recovery); + /** * @return the IO Operation type associated with this context */ diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index 0368a09..bd91094 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -662,10 +662,15 @@ @Override public void updateFilter(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException { - if (ctx.getFilterTuple() != null) { - ctx.getFilterTuple().reset(tuple); - memoryComponents.get(currentMutableComponentId.get()).getLSMComponentFilter().update(ctx.getFilterTuple(), - ctx.getFilterCmp()); + if (ctx.getFilterTuple() != null && !ctx.isFilterSkipped()) { + if (ctx.isRecovery()) { + memoryComponents.get(currentMutableComponentId.get()).getLSMComponentFilter().update(tuple, + ctx.getFilterCmp(), ctx.getModificationCallback()); + } else { + ctx.getFilterTuple().reset(tuple); + memoryComponents.get(currentMutableComponentId.get()).getLSMComponentFilter() + .update(ctx.getFilterTuple(), ctx.getFilterCmp(), ctx.getModificationCallback()); + } } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java index 72c2b07..17aa394 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexOperationContext.java @@ -23,6 +23,7 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; @@ -43,24 +44,25 @@ protected final PermutingTupleReference indexTuple; protected final MultiComparator filterCmp; protected final PermutingTupleReference filterTuple; - protected final int[] allFields; protected final List<ILSMComponent> componentHolder; protected final List<ILSMDiskComponent> componentsToBeMerged; protected final List<ILSMDiskComponent> componentsToBeReplicated; protected final ISearchOperationCallback searchCallback; - protected final IModificationOperationCallback modificationCallback; + protected final IExtendedModificationOperationCallback modificationCallback; protected IndexOperation op; protected boolean accessingComponents = false; protected ISearchPredicate searchPredicate; protected final ITracer tracer; protected final long traceCategory; private long enterExitTime = 0L; + protected boolean skipFilter = false; + protected boolean recovery = false; private LSMIOOperationType ioOpType = LSMIOOperationType.NOOP; private ILSMDiskComponent newDiskComponent; public AbstractLSMIndexOperationContext(ILSMIndex index, int[] treeFields, int[] filterFields, IBinaryComparatorFactory[] filterCmpFactories, ISearchOperationCallback searchCallback, - IModificationOperationCallback modificationCallback, ITracer tracer) { + IExtendedModificationOperationCallback modificationCallback, ITracer tracer) { this.index = index; this.searchCallback = searchCallback; this.modificationCallback = modificationCallback; @@ -71,18 +73,10 @@ indexTuple = new PermutingTupleReference(treeFields); filterCmp = MultiComparator.create(filterCmpFactories); filterTuple = new PermutingTupleReference(filterFields); - allFields = new int[treeFields.length + filterFields.length]; - for (int i = 0; i < treeFields.length; i++) { - allFields[i] = treeFields[i]; - } - for (int i = treeFields.length; i < treeFields.length + filterFields.length; i++) { - allFields[i] = filterFields[i - treeFields.length]; - } } else { indexTuple = null; filterCmp = null; filterTuple = null; - allFields = null; } this.tracer = tracer; this.traceCategory = tracer.getRegistry().get("op-ctx"); @@ -137,7 +131,7 @@ } @Override - public IModificationOperationCallback getModificationCallback() { + public IExtendedModificationOperationCallback getModificationCallback() { return modificationCallback; } @@ -196,6 +190,25 @@ } @Override + public boolean isFilterSkipped() { + return skipFilter; + } + + @Override + public void setFilterSkip(boolean skip) { + this.skipFilter = skip; + } + + @Override + public boolean isRecovery() { + return recovery; + } + + @Override + public void setRecovery(boolean recovery) { + this.recovery = recovery; + } + public LSMIOOperationType getIoOperationType() { return ioOpType; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java index a992c5e..7bb24dc 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ComponentReplacementContext.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; @@ -91,7 +92,7 @@ } @Override - public IModificationOperationCallback getModificationCallback() { + public IExtendedModificationOperationCallback getModificationCallback() { return null; } @@ -216,6 +217,26 @@ } @Override + public boolean isFilterSkipped() { + return false; + } + + @Override + public void setFilterSkip(boolean skip) { + //not used in recovery + } + + @Override + public boolean isRecovery() { + return false; + } + + @Override + public void setRecovery(boolean recovery) { + //not used in recovery + } + + @Override public void destroy() throws HyracksDataException { // No Op.. Nothing to destroy } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java index 625f81e..43d2b0d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/FilterBulkLoader.java @@ -21,6 +21,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.api.ITreeIndex; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; @@ -76,6 +77,6 @@ private void updateFilter(ITupleReference tuple) throws HyracksDataException { filterTuple.reset(tuple); - filter.update(filterTuple, filterCmp); + filter.update(filterTuple, filterCmp, NoOpOperationCallback.INSTANCE); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java index 5f01550..6ccb114 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilter.java @@ -23,6 +23,7 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleReference; import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; @@ -63,10 +64,14 @@ } @Override - public void update(ITupleReference tuple, MultiComparator cmp) throws HyracksDataException { + public void update(ITupleReference tuple, MultiComparator cmp, IExtendedModificationOperationCallback opCallback) + throws HyracksDataException { + boolean logged = false; if (minTuple == null) { int numBytes = tupleWriter.bytesRequired(tuple); minTupleBytes = new byte[numBytes]; + opCallback.after(tuple); + logged = true; tupleWriter.writeTuple(tuple, minTupleBytes, 0); minTupleBuf = ByteBuffer.wrap(minTupleBytes); minTuple = tupleWriter.createTupleReference(); @@ -74,6 +79,8 @@ } else { int c = cmp.compare(tuple, minTuple); if (c < 0) { + opCallback.after(tuple); + logged = true; int numBytes = tupleWriter.bytesRequired(tuple); if (minTupleBytes.length < numBytes) { minTupleBytes = new byte[numBytes]; @@ -88,6 +95,9 @@ if (maxTuple == null) { int numBytes = tupleWriter.bytesRequired(tuple); maxTupleBytes = new byte[numBytes]; + if (!logged) { + opCallback.after(tuple); + } tupleWriter.writeTuple(tuple, maxTupleBytes, 0); maxTupleBuf = ByteBuffer.wrap(maxTupleBytes); maxTuple = tupleWriter.createTupleReference(); @@ -95,6 +105,9 @@ } else { int c = cmp.compare(tuple, maxTuple); if (c > 0) { + if (!logged) { + opCallback.after(tuple); + } int numBytes = tupleWriter.bytesRequired(tuple); if (maxTupleBytes.length < numBytes) { maxTupleBytes = new byte[numBytes]; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java index 90ca7d6..c2b4842 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentFilterManager.java @@ -23,9 +23,11 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilter; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterManager; @@ -42,11 +44,11 @@ } @Override - public void updateFilter(ILSMComponentFilter filter, List<ITupleReference> filterTuples) - throws HyracksDataException { + public void updateFilter(ILSMComponentFilter filter, List<ITupleReference> filterTuples, + IExtendedModificationOperationCallback opCallback) throws HyracksDataException { MultiComparator filterCmp = MultiComparator.create(filter.getFilterCmpFactories()); for (ITupleReference tuple : filterTuples) { - filter.update(tuple, filterCmp); + filter.update(tuple, filterCmp, opCallback); } } @@ -78,7 +80,7 @@ List<ITupleReference> filterTuples = new ArrayList<>(); filterTuples.add(filterFrame.getMinTuple()); filterTuples.add(filterFrame.getMaxTuple()); - updateFilter(filter, filterTuples); + updateFilter(filter, filterTuples, NoOpOperationCallback.INSTANCE); return true; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index 9794a98..1548f86 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -205,6 +205,7 @@ return cursorFactory.create(ctx); } + @Override public void updateFilter(ITupleReference tuple) throws HyracksDataException { ctx.setOperation(IndexOperation.UPSERT); lsmHarness.updateFilter(ctx, tuple); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java index a395e67..5fda514 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndex.java @@ -35,6 +35,7 @@ import org.apache.hyracks.storage.am.btree.util.BTreeUtils; import org.apache.hyracks.storage.am.common.api.IIndexOperationContext; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; @@ -155,13 +156,12 @@ if (ctx.getIndexTuple() != null) { ctx.getIndexTuple().reset(tuple); indexTuple = ctx.getIndexTuple(); - ((InMemoryInvertedIndexAccessor) (ctx.getCurrentMutableInvIndexAccessors())).resetLogTuple(tuple); } else { indexTuple = tuple; } - ctx.getModificationCallback().before(tuple); - ctx.getModificationCallback().found(null, tuple); + ctx.getModificationCallback().before(indexTuple); + ctx.getModificationCallback().found(null, indexTuple); switch (ctx.getOperation()) { case INSERT: // Insert into the in-memory inverted index. @@ -336,7 +336,7 @@ List<ITupleReference> filterTuples = new ArrayList<>(); filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple()); filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple()); - filterManager.updateFilter(component.getLSMComponentFilter(), filterTuples); + filterManager.updateFilter(component.getLSMComponentFilter(), filterTuples, NoOpOperationCallback.INSTANCE); filterManager.writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); } flushingComponent.getMetadata().copy(component.getMetadata()); @@ -399,7 +399,8 @@ filterTuples.add(max); } } - getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples); + getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples, + NoOpOperationCallback.INSTANCE); getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); } componentBulkLoader.end(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java index d7408ff..247e44c 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexAccessor.java @@ -208,6 +208,11 @@ } @Override + public void updateFilter(ITupleReference tuple) throws HyracksDataException { + lsmHarness.updateFilter(ctx, tuple); + } + + @Override public void scanDiskComponents(IIndexCursor cursor) throws HyracksDataException { throw HyracksDataException.create(ErrorCode.DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java index e7a725e..e238eb2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java @@ -24,6 +24,7 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.CleanupUtils; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -54,18 +55,14 @@ IIndexAccessParameters iap, int[] invertedIndexFields, int[] filterFields, IBinaryComparatorFactory[] filterComparatorFactories, ITracer tracer) throws HyracksDataException { super(index, invertedIndexFields, filterFields, filterComparatorFactories, iap.getSearchOperationCallback(), - iap.getModificationCallback(), tracer); + (IExtendedModificationOperationCallback) iap.getModificationCallback(), tracer); mutableInvIndexAccessors = new IInvertedIndexAccessor[mutableComponents.size()]; deletedKeysBTreeAccessors = new IIndexAccessor[mutableComponents.size()]; for (int i = 0; i < mutableComponents.size(); i++) { LSMInvertedIndexMemoryComponent mutableComponent = (LSMInvertedIndexMemoryComponent) mutableComponents.get(i); - if (allFields != null) { - mutableInvIndexAccessors[i] = mutableComponent.getIndex().createAccessor(iap, allFields); - } else { - mutableInvIndexAccessors[i] = - mutableComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); - } + mutableInvIndexAccessors[i] = + mutableComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); deletedKeysBTreeAccessors[i] = mutableComponent.getBuddyIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java index e74733b..4970ccb 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndex.java @@ -182,13 +182,6 @@ (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT)); } - public InMemoryInvertedIndexAccessor createAccessor(IIndexAccessParameters iap, int[] nonIndexFields) - throws HyracksDataException { - return new InMemoryInvertedIndexAccessor(this, - new InMemoryInvertedIndexOpContext(btree, tokenCmpFactories, tokenizerFactory), nonIndexFields, - (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT)); - } - @Override public IBufferCache getBufferCache() { return btree.getBufferCache(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java index 2a35301..e48f16f 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/InMemoryInvertedIndexAccessor.java @@ -56,16 +56,6 @@ this.btreeAccessor = index.getBTree().createAccessor(NoOpIndexAccessParameters.INSTANCE); } - public InMemoryInvertedIndexAccessor(InMemoryInvertedIndex index, IIndexOperationContext opCtx, - int[] nonIndexFields, IHyracksTaskContext ctx) throws HyracksDataException { - this.ctx = ctx; - this.opCtx = opCtx; - this.index = index; - this.searcher = null; - this.btreeAccessor = index.getBTree().createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE, nonIndexFields); - } - @Override public void insert(ITupleReference tuple) throws HyracksDataException { opCtx.setOperation(IndexOperation.INSERT); @@ -135,10 +125,6 @@ return new TOccurrenceSearcher(index, ctx); } return null; - } - - public void resetLogTuple(ITupleReference newTuple) { - btreeAccessor.getOpContext().resetNonIndexFieldsTuple(newTuple); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java index 986ceac..806bbf5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndex.java @@ -91,14 +91,6 @@ } @Override - public PartitionedInMemoryInvertedIndexAccessor createAccessor(IIndexAccessParameters iap, int[] nonIndexFields) - throws HyracksDataException { - return new PartitionedInMemoryInvertedIndexAccessor(this, - new PartitionedInMemoryInvertedIndexOpContext(btree, tokenCmpFactories, tokenizerFactory), - nonIndexFields, iap); - } - - @Override public boolean openInvertedListPartitionCursors(IInvertedIndexSearcher searcher, IIndexOperationContext ictx, short numTokensLowerBound, short numTokensUpperBound, InvertedListPartitions invListPartitions) throws HyracksDataException { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java index b5044d0..64dc4c4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/inmemory/PartitionedInMemoryInvertedIndexAccessor.java @@ -34,12 +34,6 @@ super(index, opCtx, (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT)); } - public PartitionedInMemoryInvertedIndexAccessor(InMemoryInvertedIndex index, IIndexOperationContext opCtx, - int[] nonIndexFields, IIndexAccessParameters iap) throws HyracksDataException { - super(index, opCtx, nonIndexFields, - (IHyracksTaskContext) iap.getParameters().get(HyracksConstants.HYRACKS_TASK_CONTEXT)); - } - protected IInvertedIndexSearcher createSearcher() throws HyracksDataException { return new PartitionedTOccurrenceSearcher(index, ctx); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java index 0dc25ec..e36abb4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/AbstractLSMRTree.java @@ -29,6 +29,7 @@ import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.btree.impls.BTree; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.api.IIndexOperationContext; import org.apache.hyracks.storage.am.common.api.IPageManager; import org.apache.hyracks.storage.am.common.api.ITreeIndex; @@ -201,13 +202,12 @@ if (ctx.getIndexTuple() != null) { ctx.getIndexTuple().reset(tuple); indexTuple = ctx.getIndexTuple(); - ctx.getCurrentMutableRTreeAccessor().getOpContext().resetNonIndexFieldsTuple(tuple); } else { indexTuple = tuple; } ctx.getModificationCallback().before(indexTuple); - ctx.getModificationCallback().found(null, tuple); + ctx.getModificationCallback().found(null, indexTuple); if (ctx.getOperation() == IndexOperation.INSERT) { ctx.getCurrentMutableRTreeAccessor().insert(indexTuple); } else { @@ -230,8 +230,9 @@ @Override protected LSMRTreeOpContext createOpContext(IIndexAccessParameters iap) { return new LSMRTreeOpContext(this, memoryComponents, rtreeLeafFrameFactory, rtreeInteriorFrameFactory, - btreeLeafFrameFactory, iap.getModificationCallback(), iap.getSearchOperationCallback(), getTreeFields(), - getFilterFields(), getHarness(), comparatorFields, linearizerArray, getFilterCmpFactories(), tracer); + btreeLeafFrameFactory, (IExtendedModificationOperationCallback) iap.getModificationCallback(), + iap.getSearchOperationCallback(), getTreeFields(), getFilterFields(), getHarness(), comparatorFields, + linearizerArray, getFilterCmpFactories(), tracer); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java index 7c1467c..4510618 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTree.java @@ -36,6 +36,7 @@ import org.apache.hyracks.storage.am.common.api.IIndexOperationContext; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.common.tuples.DualTupleReference; import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper; @@ -151,7 +152,8 @@ List<ITupleReference> filterTuples = new ArrayList<>(); filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple()); filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple()); - getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples); + getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples, + NoOpOperationCallback.INSTANCE); getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); } // Note. If we change the filter to write to metadata object, we don't need the if block above @@ -289,7 +291,8 @@ filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple()); filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple()); } - getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples); + getFilterManager().updateFilter(mergedComponent.getLSMComponentFilter(), filterTuples, + NoOpOperationCallback.INSTANCE); getFilterManager().writeFilter(mergedComponent.getLSMComponentFilter(), mergedComponent.getMetadataHolder()); } @@ -389,7 +392,6 @@ if (ctx.getIndexTuple() != null) { ctx.getIndexTuple().reset(tuple); indexTuple = ctx.getIndexTuple(); - ctx.getCurrentMutableRTreeAccessor().getOpContext().resetNonIndexFieldsTuple(tuple); } else { indexTuple = tuple; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java index 858f6e0..eb79960 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java @@ -26,6 +26,7 @@ import org.apache.hyracks.api.util.CleanupUtils; import org.apache.hyracks.storage.am.btree.impls.BTree; import org.apache.hyracks.storage.am.btree.impls.BTreeOpContext; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; @@ -57,7 +58,7 @@ public LSMRTreeOpContext(ILSMIndex index, List<ILSMMemoryComponent> mutableComponents, ITreeIndexFrameFactory rtreeLeafFrameFactory, ITreeIndexFrameFactory rtreeInteriorFrameFactory, - ITreeIndexFrameFactory btreeLeafFrameFactory, IModificationOperationCallback modificationCallback, + ITreeIndexFrameFactory btreeLeafFrameFactory, IExtendedModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback, int[] rtreeFields, int[] filterFields, ILSMHarness lsmHarness, int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray, IBinaryComparatorFactory[] filterComparatorFactories, ITracer tracer) { @@ -69,13 +70,7 @@ btreeOpContexts = new BTreeOpContext[mutableComponents.size()]; for (int i = 0; i < mutableComponents.size(); i++) { LSMRTreeMemoryComponent mutableComponent = (LSMRTreeMemoryComponent) mutableComponents.get(i); - if (allFields != null) { - mutableRTreeAccessors[i] = mutableComponent.getIndex().createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE, allFields); - } else { - mutableRTreeAccessors[i] = - mutableComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); - } + mutableRTreeAccessors[i] = mutableComponent.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); mutableBTreeAccessors[i] = mutableComponent.getBuddyIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE); @@ -146,4 +141,4 @@ throw HyracksDataException.create(failure); } } -} \ No newline at end of file +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java index 3bb94ed..a3ba4b1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeWithAntiMatterTuples.java @@ -32,6 +32,7 @@ import org.apache.hyracks.storage.am.btree.impls.RangePredicate; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.IComponentFilterHelper; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentFilterFrameFactory; @@ -174,7 +175,8 @@ List<ITupleReference> filterTuples = new ArrayList<>(); filterTuples.add(flushingComponent.getLSMComponentFilter().getMinTuple()); filterTuples.add(flushingComponent.getLSMComponentFilter().getMaxTuple()); - getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples); + getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples, + NoOpOperationCallback.INSTANCE); getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); } flushingComponent.getMetadata().copy(component.getMetadata()); @@ -249,7 +251,8 @@ filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMinTuple()); filterTuples.add(mergeOp.getMergingComponents().get(i).getLSMComponentFilter().getMaxTuple()); } - getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples); + getFilterManager().updateFilter(component.getLSMComponentFilter(), filterTuples, + NoOpOperationCallback.INSTANCE); getFilterManager().writeFilter(component.getLSMComponentFilter(), component.getMetadataHolder()); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java index 5582075..f6f0a3d 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTree.java @@ -157,12 +157,6 @@ modificationCallback); } - private RTreeOpContext createOpContext(IModificationOperationCallback modificationCallback, int[] nonIndexFields) { - return new RTreeOpContext((IRTreeLeafFrame) leafFrameFactory.createFrame(), - (IRTreeInteriorFrame) interiorFrameFactory.createFrame(), freePageManager, cmpFactories, - modificationCallback, nonIndexFields); - } - private ICachedPage findLeaf(RTreeOpContext ctx) throws HyracksDataException { int pageId = rootPage; boolean writeLatched = false; @@ -305,7 +299,7 @@ if (!isLeaf) { ctx.getInteriorFrame().insert(tuple, -1); } else { - foundModCallback(ctx, null, tuple); + ctx.getModificationCallback().found(null, tuple); ctx.getLeafFrame().insert(tuple, -1); } succeeded = true; @@ -330,7 +324,7 @@ ctx.getInteriorFrame().insert(tuple, -1); } else { ctx.getLeafFrame().compact(); - foundModCallback(ctx, null, tuple); + ctx.getModificationCallback().found(null, tuple); ctx.getLeafFrame().insert(tuple, -1); } succeeded = true; @@ -368,7 +362,7 @@ rightFrame.setPage(rightNode); rightFrame.initBuffer((byte) 0); rightFrame.setRightPage(ctx.getInteriorFrame().getRightPage()); - foundModCallback(ctx, null, tuple); + ctx.getModificationCallback().found(null, tuple); ctx.getLeafFrame().split(rightFrame, tuple, ctx.getSplitKey(), ctx, bufferCache); ctx.getLeafFrame().setRightPage(rightPageId); } @@ -761,11 +755,6 @@ return new RTreeAccessor(this, iap.getModificationCallback(), iap.getSearchOperationCallback()); } - public RTreeAccessor createAccessor(IModificationOperationCallback modificationCallback, - ISearchOperationCallback searchCallback, int[] nonIndexFields) { - return new RTreeAccessor(this, modificationCallback, searchCallback, nonIndexFields); - } - public class RTreeAccessor implements ITreeIndexAccessor { private RTree rtree; private RTreeOpContext ctx; @@ -775,12 +764,6 @@ ISearchOperationCallback searchCallback) { this.rtree = rtree; this.ctx = rtree.createOpContext(modificationCallback); - } - - public RTreeAccessor(RTree rtree, IModificationOperationCallback modificationCallback, - ISearchOperationCallback searchCallback, int[] nonIndexFields) { - this.rtree = rtree; - this.ctx = rtree.createOpContext(modificationCallback, nonIndexFields); } public void reset(RTree rtree, IModificationOperationCallback modificationCallback) { @@ -1125,14 +1108,5 @@ @Override public int getNumOfFilterFields() { return 0; - } - - private void foundModCallback(RTreeOpContext ctx, ITupleReference before, ITupleReference after) - throws HyracksDataException { - if (ctx.getTupleWithNonIndexFields() != null) { - ctx.getModificationCallback().found(before, ctx.getTupleWithNonIndexFields()); - } else { - ctx.getModificationCallback().found(before, after); - } } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeOpContext.java b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeOpContext.java index 46e6b22..76071f9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeOpContext.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/impls/RTreeOpContext.java @@ -30,7 +30,6 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor; import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; -import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference; import org.apache.hyracks.storage.am.rtree.api.IRTreeInteriorFrame; import org.apache.hyracks.storage.am.rtree.api.IRTreeLeafFrame; import org.apache.hyracks.storage.common.IModificationOperationCallback; @@ -61,7 +60,6 @@ private IModificationOperationCallback modificationCallback; - private PermutingTupleReference tupleWithNonIndexFields; private boolean destroyed = false; public RTreeOpContext(IRTreeLeafFrame leafFrame, IRTreeInteriorFrame interiorFrame, IPageManager freePageManager, @@ -81,13 +79,6 @@ pathList = new PathList(INITIAL_HEIGHT, INITIAL_HEIGHT); NSNUpdates = new ArrayList<>(); LSNUpdates = new ArrayList<>(); - } - - public RTreeOpContext(IRTreeLeafFrame leafFrame, IRTreeInteriorFrame interiorFrame, IPageManager freePageManager, - IBinaryComparatorFactory[] cmpFactories, IModificationOperationCallback modificationCallback, - int[] nonIndexFields) { - this(leafFrame, interiorFrame, freePageManager, cmpFactories, modificationCallback); - tupleWithNonIndexFields = new PermutingTupleReference(nonIndexFields); } public ITupleReference getTuple() { @@ -199,14 +190,6 @@ public RTreeCursorInitialState getCursorInitialState() { return cursorInitialState; - } - - public ITupleReference getTupleWithNonIndexFields() { - return tupleWithNonIndexFields; - } - - public void resetNonIndexFieldsTuple(ITupleReference newValue) { - tupleWithNonIndexFields.reset(newValue); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java index 1c81f68..e915112 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/btree/AbstractModificationOperationCallbackTest.java @@ -23,6 +23,7 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.dataflow.common.utils.TupleUtils; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.config.AccessMethodTestsConfig; @@ -84,7 +85,7 @@ } } - private class VeriyfingModificationCallback implements IModificationOperationCallback { + private class VeriyfingModificationCallback implements IExtendedModificationOperationCallback { @Override public void before(ITupleReference tuple) throws HyracksDataException { @@ -100,6 +101,11 @@ } Assert.assertEquals(0, cmp.compare(AbstractModificationOperationCallbackTest.this.tuple, after)); } + + @Override + public void after(ITupleReference tuple) throws HyracksDataException { + Assert.assertEquals(0, cmp.compare(AbstractModificationOperationCallbackTest.this.tuple, tuple)); + } } } diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java index d796ece..3a5c8fe 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/storage/am/common/TestOperationCallback.java @@ -22,10 +22,11 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.common.IModificationOperationCallback; import org.apache.hyracks.storage.common.ISearchOperationCallback; -public enum TestOperationCallback implements ISearchOperationCallback, IModificationOperationCallback { +public enum TestOperationCallback implements ISearchOperationCallback, IExtendedModificationOperationCallback { INSTANCE; private static final int RANDOM_SEED = 50; @@ -65,4 +66,9 @@ public void complete(ITupleReference tuple) throws HyracksDataException { // Do nothing. } + + @Override + public void after(ITupleReference tuple) throws HyracksDataException { + // Do nothing. + } } diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java index d42c3b6..fc852cd 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/LSMBTreeUpdateInPlaceTest.java @@ -29,6 +29,7 @@ import org.apache.hyracks.dataflow.common.utils.TupleUtils; import org.apache.hyracks.storage.am.btree.AbstractOperationCallbackTest; import org.apache.hyracks.storage.am.common.api.IBTreeIndexTupleReference; +import org.apache.hyracks.storage.am.common.api.IExtendedModificationOperationCallback; import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.config.AccessMethodTestsConfig; @@ -153,7 +154,7 @@ test((IIndexAccessor a) -> a.upsert(tuple), (IIndexAccessor a) -> a.upsert(tuple)); } - private class VerifyingUpdateModificationCallback implements IModificationOperationCallback { + private class VerifyingUpdateModificationCallback implements IExtendedModificationOperationCallback { private final ITupleReference tuple; @@ -176,6 +177,11 @@ } Assert.assertEquals(0, cmp.compare(this.tuple, after)); } + + @Override + public void after(ITupleReference tuple) { + //Nothing to do there, not testing filters + } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1857 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ie9e7795d9c8c212e8610dcb9bb5d26ec9fbbee8a Gerrit-PatchSet: 46 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Ian Maxon <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Ian Maxon <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
