Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-2444][STO] Avoid Using System Clock in Storage ......................................................................
[ASTERIXDB-2444][STO] Avoid Using System Clock in Storage - user model changes: no - storage format changes: yes - interface changes: yes Details: - Replace the usage of system clock timestamps in LSM index components file names by a sequencer. The next sequence id to use is determined by checking the list of existing components on disk. Note that due to a rollback, an index checkpoint file may have last valid component sequence which is greater than what is on disk. This should not cause any issues since only components that have a sequence greater than that appears in the checkpoint will be deleted. - Replace the usage of system clock timestamps in LSM index components ids by a monotonically increasing sequencer. The sequencer is initialized after restarts by the last valid component id that appears in the index checkpoint. - Refactor the logic to generate flush/merge file names. - Refactor the logic to check invalid components. - Adapt test cases to new naming format. Change-Id: I9dff8ffb38ce8064a199d03b070ed1f5b924b8a4 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2927 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java M asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java A hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexComponentFileReference.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java M hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java M hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java 24 files changed, 517 insertions(+), 498 deletions(-) Approvals: Anon. E. Moose #1000171: abdullah alamoudi: Looks good to me, approved Jenkins: Verified; No violations found; Verified Murtadha Hubail: Looks good to me, but someone else must approve diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java index 64d8e93..3c62d99 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java @@ -30,7 +30,6 @@ import java.util.Arrays; import java.util.Comparator; import java.util.List; -import java.util.Optional; import org.apache.asterix.common.storage.IIndexCheckpointManager; import org.apache.asterix.common.storage.IndexCheckpoint; @@ -56,7 +55,7 @@ } @Override - public synchronized void init(String lastComponentTimestamp, long lsn) throws HyracksDataException { + public synchronized void init(long validComponentSequence, long lsn) throws HyracksDataException { List<IndexCheckpoint> checkpoints; try { checkpoints = getCheckpoints(); @@ -67,25 +66,24 @@ LOGGER.warn(() -> "Checkpoints found on initializing: " + indexPath); delete(); } - IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(lastComponentTimestamp, lsn); + IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(validComponentSequence, lsn); persist(firstCheckpoint); } @Override - public synchronized void replicated(String componentTimestamp, long masterLsn, long componentId) + public synchronized void replicated(long componentSequence, long masterLsn, long componentId) throws HyracksDataException { final Long localLsn = getLatest().getMasterNodeFlushMap().get(masterLsn); if (localLsn == null) { throw new IllegalStateException("Component flushed before lsn mapping was received"); } - flushed(componentTimestamp, localLsn, componentId); + flushed(componentSequence, localLsn, componentId); } @Override - public synchronized void flushed(String componentTimestamp, long lsn, long componentId) - throws HyracksDataException { + public synchronized void flushed(long componentSequence, long lsn, long componentId) throws HyracksDataException { final IndexCheckpoint latest = getLatest(); - IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentTimestamp, componentId); + IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, componentSequence, componentId); persist(nextCheckpoint); deleteHistory(nextCheckpoint.getId(), HISTORY_CHECKPOINTS); } @@ -95,7 +93,7 @@ final IndexCheckpoint latest = getLatest(); latest.getMasterNodeFlushMap().put(masterLsn, localLsn); final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(), - latest.getValidComponentTimestamp(), latest.getLastComponentId()); + latest.getValidComponentSequence(), latest.getLastComponentId()); persist(next); notifyAll(); } @@ -119,9 +117,8 @@ } @Override - public Optional<String> getValidComponentTimestamp() throws HyracksDataException { - String validComponentTimestamp = getLatest().getValidComponentTimestamp(); - return validComponentTimestamp != null ? Optional.of(validComponentTimestamp) : Optional.empty(); + public long getValidComponentSequence() throws HyracksDataException { + return getLatest().getValidComponentSequence(); } @Override @@ -153,18 +150,17 @@ @Override public synchronized void setLastComponentId(long componentId) throws HyracksDataException { final IndexCheckpoint latest = getLatest(); - final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(), - latest.getValidComponentTimestamp(), componentId); + final IndexCheckpoint next = + IndexCheckpoint.next(latest, latest.getLowWatermark(), latest.getValidComponentSequence(), componentId); persist(next); } @Override - public synchronized void advanceValidComponentTimestamp(String timestamp) throws HyracksDataException { + public synchronized void advanceValidComponentSequence(long componentSequence) throws HyracksDataException { final IndexCheckpoint latest = getLatest(); - if (latest.getValidComponentTimestamp() == null - || timestamp.compareTo(latest.getValidComponentTimestamp()) > 0) { - final IndexCheckpoint next = - IndexCheckpoint.next(latest, latest.getLowWatermark(), timestamp, latest.getLastComponentId()); + if (componentSequence > latest.getValidComponentSequence()) { + final IndexCheckpoint next = IndexCheckpoint.next(latest, latest.getLowWatermark(), componentSequence, + latest.getLastComponentId()); persist(next); } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java index 108afa7..f4c0ea6 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java @@ -28,7 +28,7 @@ import org.apache.hyracks.api.io.IJsonSerializable; import org.apache.hyracks.api.io.IPersistedResourceRegistry; import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; @@ -64,8 +64,8 @@ // Whenever this is called, it resets the counter // However, the counters for the failed operations are never reset since we expect them // To be always 0 - return new TestLsmIoOpCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index, - getComponentIdGenerator().getId(), getIndexCheckpointManagerProvider()); + return new TestLsmIoOpCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index, getComponentIdGenerator(), + getIndexCheckpointManagerProvider()); } public int getTotalFlushes() { @@ -125,9 +125,9 @@ public class TestLsmIoOpCallback extends LSMIOOperationCallback { private final TestLsmBtree lsmBtree; - public TestLsmIoOpCallback(DatasetInfo dsInfo, ILSMIndex index, ILSMComponentId id, + public TestLsmIoOpCallback(DatasetInfo dsInfo, ILSMIndex index, ILSMComponentIdGenerator idGenerator, IIndexCheckpointManagerProvider checkpointManagerProvider) { - super(dsInfo, index, id, checkpointManagerProvider); + super(dsInfo, index, idGenerator, checkpointManagerProvider); lsmBtree = (TestLsmBtree) index; } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java index 694a0c7..54ae683 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java @@ -18,18 +18,11 @@ */ package org.apache.asterix.test.storage; -import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT; - import java.io.File; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.text.Format; -import java.text.SimpleDateFormat; import java.util.Arrays; -import java.util.Date; -import java.util.Optional; -import java.util.concurrent.TimeUnit; import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; import org.apache.asterix.common.TestDataUtil; @@ -129,17 +122,15 @@ TestDataUtil.upsertData(datasetName, 100); ncAppCtx.getDatasetLifecycleManager().flushDataset(dataset.getDatasetId(), false); - // create new invalid component with a timestamp > checkpoint valid component timestamp (i.e. in the future) - Format formatter = new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT); - Date futureTime = new Date(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5)); - String invalidComponentTimestamp = - formatter.format(futureTime) + AbstractLSMIndexFileManager.DELIMITER + formatter.format(futureTime); + // create new invalid component sequence with a sequence > checkpoint valid component sequence + String invalidComponentId = "1000"; + String invalidComponentRange = invalidComponentId + AbstractLSMIndexFileManager.DELIMITER + invalidComponentId; FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath); String indexDir = indexDirRef.getFile().getAbsolutePath(); // create the invalid component files - Path btreePath = Paths.get(indexDir, invalidComponentTimestamp + AbstractLSMIndexFileManager.DELIMITER + Path btreePath = Paths.get(indexDir, invalidComponentRange + AbstractLSMIndexFileManager.DELIMITER + AbstractLSMIndexFileManager.BTREE_SUFFIX); - Path filterPath = Paths.get(indexDir, invalidComponentTimestamp + AbstractLSMIndexFileManager.DELIMITER + Path filterPath = Paths.get(indexDir, invalidComponentRange + AbstractLSMIndexFileManager.DELIMITER + AbstractLSMIndexFileManager.BLOOM_FILTER_SUFFIX); Files.createFile(btreePath); Files.createFile(filterPath); @@ -156,14 +147,14 @@ DatasetResourceReference drr = DatasetResourceReference.of(localResource); IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ncAppCtx.getIndexCheckpointManagerProvider(); IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(drr); - Optional<String> validComponentTimestamp = indexCheckpointManager.getValidComponentTimestamp(); - Assert.assertTrue(validComponentTimestamp.isPresent()); + long validComponentSequence = indexCheckpointManager.getValidComponentSequence(); + Assert.assertTrue(validComponentSequence > Long.MIN_VALUE); File[] indexRemainingFiles = indexDirRef.getFile().listFiles(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER); Assert.assertNotNull(indexRemainingFiles); long validComponentFilesCount = Arrays.stream(indexRemainingFiles) - .filter(file -> file.getName().startsWith(validComponentTimestamp.get())).count(); + .filter(file -> file.getName().startsWith(String.valueOf(validComponentSequence))).count(); Assert.assertTrue(validComponentFilesCount > 0); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 50a4bef..682eaea 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.context; import static org.apache.asterix.common.metadata.MetadataIndexImmutableProperties.METADATA_DATASETS_PARTITIONS; +import static org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId.MIN_VALID_COMPONENT_ID; import java.io.IOException; import java.io.OutputStream; @@ -112,6 +113,9 @@ datasetResource = getDatasetLifecycle(did); } datasetResource.register(resource, (ILSMIndex) index); + if (((ILSMIndex) index).isPrimaryIndex()) { + initializeDatasetPartitionValidComponentId(datasetResource, resource); + } } private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException { @@ -600,4 +604,24 @@ indexInfo.setOpen(false); } } + + private void initializeDatasetPartitionValidComponentId(DatasetResource datasetResource, + LocalResource primaryIndexResource) { + final IndexInfo indexInfo = datasetResource.getIndexInfo(primaryIndexResource.getId()); + final int partition = indexInfo.getPartition(); + final ILSMComponentIdGenerator componentIdGenerator = + getComponentIdGenerator(datasetResource.getDatasetID(), partition); + final long indexLastValidComponentId = getIndexLastValidComponentId(indexInfo.getLocalResource()); + componentIdGenerator.init(indexLastValidComponentId); + } + + private long getIndexLastValidComponentId(LocalResource resource) { + try { + final DatasetResourceReference datasetResource = DatasetResourceReference.of(resource); + return Math.max(indexCheckpointManagerProvider.get(datasetResource).getLatest().getLastComponentId(), + MIN_VALID_COMPONENT_ID); + } catch (HyracksDataException e) { + throw new IllegalStateException(e); + } + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java index ea53d68..606d63c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java @@ -37,6 +37,7 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus; @@ -44,9 +45,9 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; +import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils; @@ -68,17 +69,19 @@ private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; protected final DatasetInfo dsInfo; protected final ILSMIndex lsmIndex; + private final ILSMComponentIdGenerator componentIdGenerator; private long firstLsnForCurrentMemoryComponent = 0L; private long persistenceLsn = 0L; private int pendingFlushes = 0; private Deque<ILSMComponentId> componentIds = new ArrayDeque<>(); + private boolean firstAllocation = true; - public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentId nextComponentId, + public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, ILSMComponentIdGenerator componentIdGenerator, IIndexCheckpointManagerProvider indexCheckpointManagerProvider) { this.dsInfo = dsInfo; this.lsmIndex = lsmIndex; + this.componentIdGenerator = componentIdGenerator; this.indexCheckpointManagerProvider = indexCheckpointManagerProvider; - componentIds.add(nextComponentId); } @Override @@ -132,8 +135,8 @@ operation.getIOOpertionType() == LSMIOOperationType.FLUSH ? (Long) map.get(KEY_FLUSH_LOG_LSN) : 0L; final LSMComponentId id = (LSMComponentId) map.get(KEY_FLUSHED_COMPONENT_ID); final ResourceReference ref = ResourceReference.of(target.getAbsolutePath()); - final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName()); - indexCheckpointManagerProvider.get(ref).flushed(componentEndTime, lsn, id.getMaxId()); + final long componentSequence = IndexComponentFileReference.of(ref.getName()).getSequenceEnd(); + indexCheckpointManagerProvider.get(ref).flushed(componentSequence, lsn, id.getMaxId()); } private void deleteComponentsFromCheckpoint(ILSMIOOperation operation) throws HyracksDataException { @@ -275,6 +278,9 @@ @Override public void allocated(ILSMMemoryComponent component) throws HyracksDataException { - // No Op + if (firstAllocation) { + firstAllocation = false; + componentIds.add(componentIdGenerator.getId()); + } } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java index 25cd8b2..68ffd6a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java @@ -70,8 +70,8 @@ @Override public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException { - return new LSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index, - getComponentIdGenerator().getId(), getIndexCheckpointManagerProvider()); + return new LSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index, getComponentIdGenerator(), + getIndexCheckpointManagerProvider()); } @Override diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java index 2c0872c..dd9ede5 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java @@ -27,33 +27,33 @@ /** * Initializes the first checkpoint of an index with low watermark {@code lsn} * - * @param componentTimestamp + * @param validComponentSequence * @param lsn * @throws HyracksDataException */ - void init(String componentTimestamp, long lsn) throws HyracksDataException; + void init(long validComponentSequence, long lsn) throws HyracksDataException; /** * Called when a new LSM disk component is flushed. When called, the index checkpoint is updated - * with the latest valid {@code componentTimestamp} and low watermark {@code lsn} + * with the latest valid {@code componentSequence} and low watermark {@code lsn} * - * @param componentTimestamp + * @param componentSequence * @param lsn * @throws HyracksDataException */ - void flushed(String componentTimestamp, long lsn, long componentId) throws HyracksDataException; + void flushed(long componentSequence, long lsn, long componentId) throws HyracksDataException; /** * Called when a new LSM disk component is replicated from master. When called, the index checkpoint is updated - * with the latest valid {@code componentTimestamp} and the local lsn mapping of {@code masterLsn} is set as the + * with the latest valid {@code componentSequence} and the local lsn mapping of {@code masterLsn} is set as the * new low watermark. * - * @param componentTimestamp + * @param componentSequence * @param masterLsn * @param componentId * @throws HyracksDataException */ - void replicated(String componentTimestamp, long masterLsn, long componentId) throws HyracksDataException; + void replicated(long componentSequence, long masterLsn, long componentId) throws HyracksDataException; /** * Called when a flush log is received and replicated from master. The mapping between @@ -88,12 +88,12 @@ void delete(); /** - * Gets the index last valid component timestamp if the index has any components. Otherwise {@link Optional#empty()} + * Gets the index last valid component sequence. * - * @return the index last valid component timestamp + * @return the index last valid component sequence * @throws HyracksDataException */ - Optional<String> getValidComponentTimestamp() throws HyracksDataException; + long getValidComponentSequence() throws HyracksDataException; /** * Gets the number of valid checkpoints the index has. @@ -110,12 +110,12 @@ IndexCheckpoint getLatest() throws HyracksDataException; /** - * Advance the last valid component timestamp. Used for replicated bulkloaded components + * Advance the last valid component sequence. Used for replicated bulkloaded components * - * @param timeStamp + * @param componentSequence * @throws HyracksDataException */ - void advanceValidComponentTimestamp(String timeStamp) throws HyracksDataException; + void advanceValidComponentSequence(long componentSequence) throws HyracksDataException; /** * Set the last component id. Used during recovery or after component delete diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java index 73d3122..f84167e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java @@ -34,22 +34,22 @@ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final long INITIAL_CHECKPOINT_ID = 0; private long id; - private String validComponentTimestamp; + private long validComponentSequence; private long lowWatermark; private long lastComponentId; private Map<Long, Long> masterNodeFlushMap; - public static IndexCheckpoint first(String lastComponentTimestamp, long lowWatermark) { + public static IndexCheckpoint first(long lastComponentSequence, long lowWatermark) { IndexCheckpoint firstCheckpoint = new IndexCheckpoint(); firstCheckpoint.id = INITIAL_CHECKPOINT_ID; firstCheckpoint.lowWatermark = lowWatermark; - firstCheckpoint.validComponentTimestamp = lastComponentTimestamp; + firstCheckpoint.validComponentSequence = lastComponentSequence; firstCheckpoint.lastComponentId = LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(); firstCheckpoint.masterNodeFlushMap = new HashMap<>(); return firstCheckpoint; } - public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, String validComponentTimestamp, + public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, long validComponentSequence, long lastComponentId) { if (lowWatermark < latest.getLowWatermark()) { throw new IllegalStateException("Low watermark should always be increasing"); @@ -58,7 +58,7 @@ next.id = latest.getId() + 1; next.lowWatermark = lowWatermark; next.lastComponentId = lastComponentId; - next.validComponentTimestamp = validComponentTimestamp; + next.validComponentSequence = validComponentSequence; next.masterNodeFlushMap = latest.getMasterNodeFlushMap(); // remove any lsn from the map that wont be used anymore next.masterNodeFlushMap.values().removeIf(lsn -> lsn <= lowWatermark); @@ -69,8 +69,8 @@ private IndexCheckpoint() { } - public String getValidComponentTimestamp() { - return validComponentTimestamp; + public long getValidComponentSequence() { + return validComponentSequence; } public long getLowWatermark() { diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java index 29a2aa0..4b18e1d 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java @@ -19,9 +19,8 @@ package org.apache.asterix.test.ioopcallbacks; -import java.text.Format; -import java.text.SimpleDateFormat; -import java.util.Date; +import static org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId.MIN_VALID_COMPONENT_ID; + import java.util.HashMap; import java.util.Map; @@ -38,7 +37,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata; import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; @@ -64,13 +62,11 @@ * 7. destroy */ - private static final Format FORMATTER = - new SimpleDateFormat(AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT); + private static long COMPONENT_SEQUENCE = 0; private static String getComponentFileName() { - Date date = new Date(); - String ts = FORMATTER.format(date); - return ts + '_' + ts; + final String sequence = String.valueOf(COMPONENT_SEQUENCE++); + return sequence + '_' + sequence; } @Test @@ -83,8 +79,9 @@ Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class)); DatasetInfo dsInfo = new DatasetInfo(101, null); LSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents); - LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(), - mockIndexCheckpointManagerProvider()); + idGenerator.init(MIN_VALID_COMPONENT_ID); + LSMIOOperationCallback callback = + new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider()); //Flush first idGenerator.refresh(); long flushLsn = 1L; @@ -142,21 +139,18 @@ int numMemoryComponents = 2; DatasetInfo dsInfo = new DatasetInfo(101, null); ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents); + idGenerator.init(MIN_VALID_COMPONENT_ID); ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents); ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); - LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(), - mockIndexCheckpointManagerProvider()); + LSMIOOperationCallback callback = + new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider()); + callback.allocated(mockComponent); ILSMComponentId initialId = idGenerator.getId(); - // simulate a partition is flushed before allocated idGenerator.refresh(); long flushLsn = 1L; ILSMComponentId nextComponentId = idGenerator.getId(); - Map<String, Object> flushMap = new HashMap<>(); - flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn); - flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId); - callback.allocated(mockComponent); callback.recycled(mockComponent); checkMemoryComponent(initialId, mockComponent); } @@ -166,13 +160,15 @@ int numMemoryComponents = 2; DatasetInfo dsInfo = new DatasetInfo(101, null); ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator(numMemoryComponents); + idGenerator.init(MIN_VALID_COMPONENT_ID); ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class); Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents); ILSMMemoryComponent mockComponent = Mockito.mock(AbstractLSMMemoryComponent.class); Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent); - LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator.getId(), - mockIndexCheckpointManagerProvider()); + LSMIOOperationCallback callback = + new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator, mockIndexCheckpointManagerProvider()); String indexId = "mockIndexId"; + callback.allocated(mockComponent); ILSMComponentId id = idGenerator.getId(); callback.recycled(mockComponent); checkMemoryComponent(id, mockComponent); @@ -223,7 +219,8 @@ IIndexCheckpointManagerProvider indexCheckpointManagerProvider = Mockito.mock(IIndexCheckpointManagerProvider.class); IIndexCheckpointManager indexCheckpointManager = Mockito.mock(IIndexCheckpointManager.class); - Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.any(), Mockito.anyLong(), Mockito.anyLong()); + Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.anyLong(), Mockito.anyLong(), + Mockito.anyLong()); Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any()); return indexCheckpointManagerProvider; } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java index d4d601c..448613b 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java @@ -36,6 +36,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; +import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference; import org.apache.hyracks.storage.common.LocalResource; /** @@ -62,20 +63,19 @@ DatasetResourceReference ref = DatasetResourceReference.of(ls); final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(ref); indexCheckpointManager.delete(); - // Get most recent timestamp of existing files to avoid deletion + // Get most recent sequence of existing files to avoid deletion Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref); String[] files = indexPath.toFile().list(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER); if (files == null) { throw HyracksDataException .create(new IOException(indexPath + " is not a directory or an IO Error occurred")); } - String mostRecentTimestamp = null; + long maxComponentSequence = Long.MIN_VALUE; for (String file : files) { - String nextTimeStamp = AbstractLSMIndexFileManager.getComponentEndTime(file); - mostRecentTimestamp = mostRecentTimestamp == null || nextTimeStamp.compareTo(mostRecentTimestamp) > 0 - ? nextTimeStamp : mostRecentTimestamp; + maxComponentSequence = + Math.max(maxComponentSequence, IndexComponentFileReference.of(file).getSequenceEnd()); } - indexCheckpointManager.init(mostRecentTimestamp, currentLSN); + indexCheckpointManager.init(maxComponentSequence, currentLSN); } ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer()); } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java index d5dc51d..55dd5d4 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java @@ -63,8 +63,8 @@ final IIOManager ioManager = appCtx.getIoManager(); final FileReference localPath = ioManager.resolve(componentFile); final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath()); - final String componentId = PersistentLocalResourceRepository.getComponentId(componentFile); - return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentId); + final String componentSequence = PersistentLocalResourceRepository.getComponentSequence(componentFile); + return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence); } @Override diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java index a4f9b43..b360a09 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java @@ -37,6 +37,7 @@ import org.apache.asterix.replication.sync.IndexSynchronizer; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; +import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference; /** * A task to mark a replicated LSM component as valid @@ -57,7 +58,7 @@ public void perform(INcApplicationContext appCtx, IReplicationWorker worker) { try { if (masterLsn == IndexSynchronizer.BULKLOAD_LSN) { - updateBulkLoadedLastComponentTimestamp(appCtx); + updateBulkLoadedLastComponentSequence(appCtx); } else if (masterLsn != IndexSynchronizer.MERGE_LSN) { ensureComponentLsnFlushed(appCtx); } @@ -70,13 +71,12 @@ } } - private void updateBulkLoadedLastComponentTimestamp(INcApplicationContext appCtx) throws HyracksDataException { + private void updateBulkLoadedLastComponentSequence(INcApplicationContext appCtx) throws HyracksDataException { final ResourceReference indexRef = ResourceReference.of(file); final IIndexCheckpointManagerProvider checkpointManagerProvider = appCtx.getIndexCheckpointManagerProvider(); final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef); - final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName()); - indexCheckpointManager.advanceValidComponentTimestamp(componentEndTime); - + final long componentSequence = IndexComponentFileReference.of(indexRef.getName()).getSequenceEnd(); + indexCheckpointManager.advanceValidComponentSequence(componentSequence); } private void ensureComponentLsnFlushed(INcApplicationContext appCtx) @@ -95,8 +95,8 @@ indexCheckpointManager.wait(replicationTimeOut); replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); } - final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName()); - indexCheckpointManager.replicated(componentEndTime, masterLsn, lastComponentId); + final long componentSequence = IndexComponentFileReference.of(indexRef.getName()).getSequenceEnd(); + indexCheckpointManager.replicated(componentSequence, masterLsn, lastComponentId); } } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java index 20663d1..f53d448 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java @@ -98,7 +98,7 @@ final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef); final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN(); indexCheckpointManager.delete(); - indexCheckpointManager.init(null, currentLSN); + indexCheckpointManager.init(Long.MIN_VALUE, currentLSN); LOGGER.info(() -> "Checkpoint index: " + indexRef); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index f9718c4..c0da095 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -22,7 +22,6 @@ import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME; import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE; import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER; -import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT; import java.io.File; import java.io.FilenameFilter; @@ -30,12 +29,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.text.Format; import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; -import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -71,6 +67,7 @@ import org.apache.hyracks.api.util.IoUtil; import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; +import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference; import org.apache.hyracks.storage.common.ILocalResourceRepository; import org.apache.hyracks.storage.common.LocalResource; import org.apache.hyracks.util.ExitUtil; @@ -128,9 +125,6 @@ return true; } }; - - private static final ThreadLocal<SimpleDateFormat> THREAD_LOCAL_FORMATTER = - ThreadLocal.withInitial(() -> new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT)); // Finals private final IIOManager ioManager; @@ -202,7 +196,7 @@ byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry)); final Path path = Paths.get(resourceFile.getAbsolutePath()); Files.write(path, bytes); - indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(null, 0); + indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(Long.MIN_VALUE, 0); deleteResourceFileMask(resourceFile); } catch (Exception e) { cleanup(resourceFile); @@ -481,29 +475,17 @@ } private void deleteIndexInvalidComponents(File index) throws IOException, ParseException { - final Format formatter = THREAD_LOCAL_FORMATTER.get(); final File[] indexComponentFiles = index.listFiles(COMPONENT_FILES_FILTER); if (indexComponentFiles == null) { throw new IOException(index + " doesn't exist or an IO error occurred"); } - final Optional<String> validComponentTimestamp = getIndexCheckpointManager(index).getValidComponentTimestamp(); - if (!validComponentTimestamp.isPresent()) { - // index doesn't have any valid component, delete all - for (File componentFile : indexComponentFiles) { + final long validComponentSequence = getIndexCheckpointManager(index).getValidComponentSequence(); + for (File componentFile : indexComponentFiles) { + // delete any file with start sequence > valid component sequence + final long fileStart = IndexComponentFileReference.of(componentFile.getName()).getSequenceStart(); + if (fileStart > validComponentSequence) { LOGGER.info(() -> "Deleting invalid component file: " + componentFile.getAbsolutePath()); Files.delete(componentFile.toPath()); - } - } else { - final Date validTimestamp = (Date) formatter.parseObject(validComponentTimestamp.get()); - for (File componentFile : indexComponentFiles) { - // delete any file with startTime > validTimestamp - final String fileStartTimeStr = - AbstractLSMIndexFileManager.getComponentStartTime(componentFile.getName()); - final Date fileStartTime = (Date) formatter.parseObject(fileStartTimeStr); - if (fileStartTime.after(validTimestamp)) { - LOGGER.info(() -> "Deleting invalid component file: " + componentFile.getAbsolutePath()); - Files.delete(componentFile.toPath()); - } } } } @@ -545,8 +527,8 @@ long fileSize = file.length(); totalSize += fileSize; if (isComponentFile(resolvedPath.getFile(), file.getName())) { - String componentId = getComponentId(file.getAbsolutePath()); - componentsStats.put(componentId, componentsStats.getOrDefault(componentId, 0L) + fileSize); + String componentSeq = getComponentSequence(file.getAbsolutePath()); + componentsStats.put(componentSeq, componentsStats.getOrDefault(componentSeq, 0L) + fileSize); } } } @@ -576,17 +558,16 @@ } /** - * Gets a component id based on its unique timestamp. - * e.g. a component file 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439_b - * will return a component id 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439 + * Gets a component sequence based on its unique timestamp. + * e.g. a component file 1_3_b + * will return a component sequence 1_3 * - * @param componentFile - * any component file - * @return The component id + * @param componentFile any component file + * @return The component sequence */ - public static String getComponentId(String componentFile) { + public static String getComponentSequence(String componentFile) { final ResourceReference ref = ResourceReference.of(componentFile); - return ref.getName().substring(0, ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER)); + return IndexComponentFileReference.of(ref.getName()).getSequence(); } private static boolean isComponentMask(File mask) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java index 7fbde73..2240fd9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeFileManager.java @@ -34,6 +34,7 @@ import org.apache.hyracks.api.util.IoUtil; import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; +import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory; @@ -53,21 +54,14 @@ @Override public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException { - String ts = getCurrentTimestamp(); - String baseName = ts + DELIMITER + ts; - // Begin timestamp and end timestamp are identical since it is a flush + String baseName = getNextComponentSequence(btreeFilter); return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null, hasBloomFilter ? baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null); } @Override - public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) - throws HyracksDataException { - String[] firstTimestampRange = firstFileName.split(DELIMITER); - String[] lastTimestampRange = lastFileName.split(DELIMITER); - - String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1]; - // Get the range of timestamps by taking the earliest and the latest timestamps + public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) { + final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName); return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null, hasBloomFilter ? baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX) : null); } @@ -75,17 +69,17 @@ @Override public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException { List<LSMComponentFileReferences> validFiles = new ArrayList<>(); - ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<>(); - ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>(); + ArrayList<IndexComponentFileReference> allBTreeFiles = new ArrayList<>(); + ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>(); // create transaction filter <to hide transaction files> FilenameFilter transactionFilter = getTransactionFileFilter(false); // List of valid BTree files. cleanupAndGetValidFilesInternal(getCompoundFilter(transactionFilter, btreeFilter), btreeFactory, allBTreeFiles, btreeFactory.getBufferCache()); HashSet<String> btreeFilesSet = new HashSet<>(); - for (ComparableFileName cmpFileName : allBTreeFiles) { - int index = cmpFileName.fileName.lastIndexOf(DELIMITER); - btreeFilesSet.add(cmpFileName.fileName.substring(0, index)); + for (IndexComponentFileReference cmpFileName : allBTreeFiles) { + int index = cmpFileName.getFileName().lastIndexOf(DELIMITER); + btreeFilesSet.add(cmpFileName.getFileName().substring(0, index)); } if (hasBloomFilter) { @@ -104,53 +98,51 @@ // Special case: sorting is not required if (allBTreeFiles.size() == 1 && (!hasBloomFilter || allBloomFilterFiles.size() == 1)) { - validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).fileRef, null, - hasBloomFilter ? allBloomFilterFiles.get(0).fileRef : null)); + validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).getFileRef(), null, + hasBloomFilter ? allBloomFilterFiles.get(0).getFileRef() : null)); return validFiles; } - // Sorts files names from earliest to latest timestamp. + // Sorts files names from earliest to latest sequence. Collections.sort(allBTreeFiles); if (hasBloomFilter) { Collections.sort(allBloomFilterFiles); } - List<ComparableFileName> validComparableBTreeFiles = new ArrayList<>(); - ComparableFileName lastBTree = allBTreeFiles.get(0); + List<IndexComponentFileReference> validComparableBTreeFiles = new ArrayList<>(); + IndexComponentFileReference lastBTree = allBTreeFiles.get(0); validComparableBTreeFiles.add(lastBTree); - List<ComparableFileName> validComparableBloomFilterFiles = null; - ComparableFileName lastBloomFilter = null; + List<IndexComponentFileReference> validComparableBloomFilterFiles = null; + IndexComponentFileReference lastBloomFilter = null; if (hasBloomFilter) { validComparableBloomFilterFiles = new ArrayList<>(); lastBloomFilter = allBloomFilterFiles.get(0); validComparableBloomFilterFiles.add(lastBloomFilter); } - ComparableFileName currentBTree; - ComparableFileName currentBloomFilter = null; + IndexComponentFileReference currentBTree; + IndexComponentFileReference currentBloomFilter = null; for (int i = 1; i < allBTreeFiles.size(); i++) { currentBTree = allBTreeFiles.get(i); if (hasBloomFilter) { currentBloomFilter = allBloomFilterFiles.get(i); } - // Current start timestamp is greater than last stop timestamp. - if (currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0 - && (!hasBloomFilter || currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0)) { + // Current start sequence is greater than last stop sequence. + if (currentBTree.isMoreRecentThan(lastBTree) + && (!hasBloomFilter || currentBloomFilter.isMoreRecentThan(lastBloomFilter))) { validComparableBTreeFiles.add(currentBTree); lastBTree = currentBTree; if (hasBloomFilter) { validComparableBloomFilterFiles.add(currentBloomFilter); lastBloomFilter = currentBloomFilter; } - } else if (currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0 - && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0 - && (!hasBloomFilter || (currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0 - && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0))) { + } else if (currentBTree.isWithin(lastBTree) + && (!hasBloomFilter || currentBloomFilter.isWithin(lastBloomFilter))) { // Invalid files are completely contained in last interval. - delete(btreeFactory.getBufferCache(), currentBTree.fullPath); + delete(btreeFactory.getBufferCache(), currentBTree.getFullPath()); if (hasBloomFilter) { - delete(btreeFactory.getBufferCache(), currentBloomFilter.fullPath); + delete(btreeFactory.getBufferCache(), currentBloomFilter.getFullPath()); } } else { // This scenario should not be possible. @@ -161,21 +153,21 @@ // Sort valid files in reverse lexicographical order, such that newer // files come first. Collections.sort(validComparableBTreeFiles, recencyCmp); - Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator(); - Iterator<ComparableFileName> bloomFilterFileIter = null; + Iterator<IndexComponentFileReference> btreeFileIter = validComparableBTreeFiles.iterator(); + Iterator<IndexComponentFileReference> bloomFilterFileIter = null; if (hasBloomFilter) { Collections.sort(validComparableBloomFilterFiles, recencyCmp); bloomFilterFileIter = validComparableBloomFilterFiles.iterator(); } - ComparableFileName cmpBTreeFileName = null; - ComparableFileName cmpBloomFilterFileName = null; + IndexComponentFileReference cmpBTreeFileName = null; + IndexComponentFileReference cmpBloomFilterFileName = null; while (btreeFileIter.hasNext() && (hasBloomFilter ? bloomFilterFileIter.hasNext() : true)) { cmpBTreeFileName = btreeFileIter.next(); if (hasBloomFilter) { cmpBloomFilterFileName = bloomFilterFileIter.next(); } - validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.fileRef, null, - hasBloomFilter ? cmpBloomFilterFileName.fileRef : null)); + validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.getFileRef(), null, + hasBloomFilter ? cmpBloomFilterFileName.getFileRef() : null)); } return validFiles; @@ -183,11 +175,10 @@ @Override public LSMComponentFileReferences getNewTransactionFileReference() throws IOException { - String ts = getCurrentTimestamp(); + String sequence = getNextComponentSequence(btreeFilter); // Create transaction lock file - IoUtil.create(baseDir.getChild(TXN_PREFIX + ts)); - String baseName = ts + DELIMITER + ts; - // Begin timestamp and end timestamp are identical since it is a transaction + IoUtil.create(baseDir.getChild(TXN_PREFIX + sequence)); + String baseName = getNextComponentSequence(btreeFilter); return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), null, baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX)); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java index 23c2367..8fb3751 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyFileManager.java @@ -35,6 +35,7 @@ import org.apache.hyracks.api.util.IoUtil; import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; +import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory; @@ -58,23 +59,15 @@ @Override public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException { - String ts = getCurrentTimestamp(); - String baseName = ts + DELIMITER + ts; - // Begin timestamp and end timestamp are identical since it is a flush + String baseName = getNextComponentSequence(btreeFilter); return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + DELETE_TREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX)); } @Override - public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) - throws HyracksDataException { - String[] firstTimestampRange = firstFileName.split(DELIMITER); - String[] lastTimestampRange = lastFileName.split(DELIMITER); - - String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1]; - // Get the range of timestamps by taking the earliest and the latest - // timestamps + public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) { + final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName); return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + DELETE_TREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX)); @@ -83,18 +76,17 @@ @Override public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException { List<LSMComponentFileReferences> validFiles = new ArrayList<>(); - ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<>(); - ArrayList<ComparableFileName> allBuddyBTreeFiles = new ArrayList<>(); - ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>(); + ArrayList<IndexComponentFileReference> allBTreeFiles = new ArrayList<>(); + ArrayList<IndexComponentFileReference> allBuddyBTreeFiles = new ArrayList<>(); + ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>(); // Create transaction file filter FilenameFilter transactionFilefilter = getTransactionFileFilter(false); // Gather files. cleanupAndGetValidFilesInternal(getCompoundFilter(btreeFilter, transactionFilefilter), btreeFactory, allBTreeFiles, btreeFactory.getBufferCache()); HashSet<String> btreeFilesSet = new HashSet<>(); - for (ComparableFileName cmpFileName : allBTreeFiles) { - int index = cmpFileName.fileName.lastIndexOf(DELIMITER); - btreeFilesSet.add(cmpFileName.fileName.substring(0, index)); + for (IndexComponentFileReference cmpFileName : allBTreeFiles) { + btreeFilesSet.add(cmpFileName.getSequence()); } validateFiles(btreeFilesSet, allBuddyBTreeFiles, getCompoundFilter(buddyBtreeFilter, transactionFilefilter), buddyBtreeFactory, btreeFactory.getBufferCache()); @@ -109,52 +101,47 @@ return validFiles; } if (allBTreeFiles.size() == 1 && allBuddyBTreeFiles.size() == 1 && allBloomFilterFiles.size() == 1) { - validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).fileRef, - allBuddyBTreeFiles.get(0).fileRef, allBloomFilterFiles.get(0).fileRef)); + validFiles.add(new LSMComponentFileReferences(allBTreeFiles.get(0).getFileRef(), + allBuddyBTreeFiles.get(0).getFileRef(), allBloomFilterFiles.get(0).getFileRef())); return validFiles; } - // Sorts files names from earliest to latest timestamp. + // Sorts files names from earliest to latest sequence. Collections.sort(allBTreeFiles); Collections.sort(allBuddyBTreeFiles); Collections.sort(allBloomFilterFiles); - List<ComparableFileName> validComparableBTreeFiles = new ArrayList<>(); - ComparableFileName lastBTree = allBTreeFiles.get(0); + List<IndexComponentFileReference> validComparableBTreeFiles = new ArrayList<>(); + IndexComponentFileReference lastBTree = allBTreeFiles.get(0); validComparableBTreeFiles.add(lastBTree); - List<ComparableFileName> validComparableBuddyBTreeFiles = new ArrayList<>(); - ComparableFileName lastBuddyBTree = allBuddyBTreeFiles.get(0); + List<IndexComponentFileReference> validComparableBuddyBTreeFiles = new ArrayList<>(); + IndexComponentFileReference lastBuddyBTree = allBuddyBTreeFiles.get(0); validComparableBuddyBTreeFiles.add(lastBuddyBTree); - List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<>(); - ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0); + List<IndexComponentFileReference> validComparableBloomFilterFiles = new ArrayList<>(); + IndexComponentFileReference lastBloomFilter = allBloomFilterFiles.get(0); validComparableBloomFilterFiles.add(lastBloomFilter); for (int i = 1; i < allBTreeFiles.size(); i++) { - ComparableFileName currentBTree = allBTreeFiles.get(i); - ComparableFileName currentBuddyBTree = allBuddyBTreeFiles.get(i); - ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i); - // Current start timestamp is greater than last stop timestamp. - if (currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0 - && currentBuddyBTree.interval[0].compareTo(lastBuddyBTree.interval[1]) > 0 - && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) { + IndexComponentFileReference currentBTree = allBTreeFiles.get(i); + IndexComponentFileReference currentBuddyBTree = allBuddyBTreeFiles.get(i); + IndexComponentFileReference currentBloomFilter = allBloomFilterFiles.get(i); + // Current start sequence is greater than last stop sequence + if (currentBTree.isMoreRecentThan(lastBTree) && currentBuddyBTree.isMoreRecentThan(lastBuddyBTree) + && currentBloomFilter.isMoreRecentThan(lastBloomFilter)) { validComparableBTreeFiles.add(currentBTree); validComparableBuddyBTreeFiles.add(currentBuddyBTree); validComparableBloomFilterFiles.add(currentBloomFilter); lastBTree = currentBTree; lastBuddyBTree = currentBuddyBTree; lastBloomFilter = currentBloomFilter; - } else if (currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0 - && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0 - && currentBuddyBTree.interval[0].compareTo(lastBuddyBTree.interval[0]) >= 0 - && currentBuddyBTree.interval[1].compareTo(lastBuddyBTree.interval[1]) <= 0 - && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0 - && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) { - // Invalid files are completely contained in last interval. - delete(treeFactory.getBufferCache(), currentBTree.fullPath); - delete(treeFactory.getBufferCache(), currentBuddyBTree.fullPath); - delete(treeFactory.getBufferCache(), currentBloomFilter.fullPath); + } else if (currentBTree.isWithin(lastBTree) && currentBuddyBTree.isWithin(lastBuddyBTree) + && currentBloomFilter.isWithin(lastBloomFilter)) { + // Invalid files are completely contained in last sequence. + delete(treeFactory.getBufferCache(), currentBTree.getFullPath()); + delete(treeFactory.getBufferCache(), currentBuddyBTree.getFullPath()); + delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath()); } else { // This scenario should not be possible. throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir); @@ -163,19 +150,19 @@ // Sort valid files in reverse lexicographical order, such that newer // files come first. - Collections.sort(validComparableBTreeFiles, recencyCmp); - Collections.sort(validComparableBuddyBTreeFiles, recencyCmp); - Collections.sort(validComparableBloomFilterFiles, recencyCmp); + validComparableBTreeFiles.sort(recencyCmp); + validComparableBuddyBTreeFiles.sort(recencyCmp); + validComparableBloomFilterFiles.sort(recencyCmp); - Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator(); - Iterator<ComparableFileName> buddyBtreeFileIter = validComparableBuddyBTreeFiles.iterator(); - Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator(); + Iterator<IndexComponentFileReference> btreeFileIter = validComparableBTreeFiles.iterator(); + Iterator<IndexComponentFileReference> buddyBtreeFileIter = validComparableBuddyBTreeFiles.iterator(); + Iterator<IndexComponentFileReference> bloomFilterFileIter = validComparableBloomFilterFiles.iterator(); while (btreeFileIter.hasNext() && buddyBtreeFileIter.hasNext()) { - ComparableFileName cmpBTreeFileName = btreeFileIter.next(); - ComparableFileName cmpBuddyBTreeFileName = buddyBtreeFileIter.next(); - ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next(); - validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.fileRef, cmpBuddyBTreeFileName.fileRef, - cmpBloomFilterFileName.fileRef)); + IndexComponentFileReference cmpBTreeFileName = btreeFileIter.next(); + IndexComponentFileReference cmpBuddyBTreeFileName = buddyBtreeFileIter.next(); + IndexComponentFileReference cmpBloomFilterFileName = bloomFilterFileIter.next(); + validFiles.add(new LSMComponentFileReferences(cmpBTreeFileName.getFileRef(), + cmpBuddyBTreeFileName.getFileRef(), cmpBloomFilterFileName.getFileRef())); } return validFiles; @@ -183,10 +170,9 @@ @Override public LSMComponentFileReferences getNewTransactionFileReference() throws IOException { - String ts = getCurrentTimestamp(); // Create transaction lock file - Files.createFile(Paths.get(baseDir + TXN_PREFIX + ts)); - String baseName = ts + DELIMITER + ts; + String baseName = getNextComponentSequence(btreeFilter); + Files.createFile(Paths.get(baseDir + TXN_PREFIX + baseName)); return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + DELETE_TREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX)); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java index e6aa2d1..0af06d7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java @@ -39,4 +39,10 @@ */ int getCurrentComponentIndex(); + /** + * Initializes this {@link ILSMComponentIdGenerator} by setting the last used id + * + * @param lastUsedId + */ + void init(long lastUsedId); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java index 57fd01d..1f481c9 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java @@ -21,12 +21,9 @@ import java.io.FilenameFilter; import java.io.IOException; -import java.text.Format; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.Date; import java.util.HashSet; import java.util.List; @@ -42,7 +39,9 @@ import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.ICachedPage; import org.apache.hyracks.storage.common.file.BufferedFileHandle; +import org.apache.hyracks.util.annotations.NotThreadSafe; +@NotThreadSafe public abstract class AbstractLSMIndexFileManager implements ILSMIndexFileManager { public enum TreeIndexState { @@ -76,22 +75,18 @@ */ public static final String TXN_PREFIX = ".T"; - public static final String COMPONENT_TIMESTAMP_FORMAT = "yyyy-MM-dd-HH-mm-ss-SSS"; - public static final FilenameFilter COMPONENT_FILES_FILTER = (dir, name) -> !name.startsWith("."); protected static final FilenameFilter txnFileNameFilter = (dir, name) -> name.startsWith(TXN_PREFIX); protected static FilenameFilter bloomFilterFilter = (dir, name) -> !name.startsWith(".") && name.endsWith(BLOOM_FILTER_SUFFIX); - protected static final FilenameFilter dummyFilter = (dir, name) -> true; protected static final Comparator<String> cmp = new FileNameComparator(); + private static final FilenameFilter dummyFilter = (dir, name) -> true; protected final IIOManager ioManager; // baseDir should reflect dataset name and partition name and be absolute protected final FileReference baseDir; - protected final Format formatter = new SimpleDateFormat(COMPONENT_TIMESTAMP_FORMAT); - protected final Comparator<ComparableFileName> recencyCmp = new RecencyComparator(); + protected final Comparator<IndexComponentFileReference> recencyCmp = new RecencyComparator(); protected final TreeIndexFactory<? extends ITreeIndex> treeFactory; - private String prevTimestamp = null; public AbstractLSMIndexFileManager(IIOManager ioManager, FileReference file, TreeIndexFactory<? extends ITreeIndex> treeFactory) { @@ -131,18 +126,18 @@ } protected void cleanupAndGetValidFilesInternal(FilenameFilter filter, - TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles, + TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<IndexComponentFileReference> allFiles, IBufferCache bufferCache) throws HyracksDataException { String[] files = listDirFiles(baseDir, filter); for (String fileName : files) { FileReference fileRef = baseDir.getChild(fileName); if (treeFactory == null) { - allFiles.add(new ComparableFileName(fileRef)); + allFiles.add(IndexComponentFileReference.of(fileRef)); continue; } TreeIndexState idxState = isValidTreeIndex(treeFactory.createIndexInstance(fileRef)); if (idxState == TreeIndexState.VALID) { - allFiles.add(new ComparableFileName(fileRef)); + allFiles.add(IndexComponentFileReference.of(fileRef)); } else if (idxState == TreeIndexState.INVALID) { bufferCache.deleteFile(fileRef); } @@ -167,18 +162,16 @@ return files; } - protected void validateFiles(HashSet<String> groundTruth, ArrayList<ComparableFileName> validFiles, + protected void validateFiles(HashSet<String> groundTruth, ArrayList<IndexComponentFileReference> validFiles, FilenameFilter filter, TreeIndexFactory<? extends ITreeIndex> treeFactory, IBufferCache bufferCache) throws HyracksDataException { - ArrayList<ComparableFileName> tmpAllInvListsFiles = new ArrayList<>(); + ArrayList<IndexComponentFileReference> tmpAllInvListsFiles = new ArrayList<>(); cleanupAndGetValidFilesInternal(filter, treeFactory, tmpAllInvListsFiles, bufferCache); - for (ComparableFileName cmpFileName : tmpAllInvListsFiles) { - int index = cmpFileName.fileName.lastIndexOf(DELIMITER); - String file = cmpFileName.fileName.substring(0, index); - if (groundTruth.contains(file)) { + for (IndexComponentFileReference cmpFileName : tmpAllInvListsFiles) { + if (groundTruth.contains(cmpFileName.getSequence())) { validFiles.add(cmpFileName); } else { - delete(bufferCache, cmpFileName.fullPath); + delete(bufferCache, cmpFileName.getFullPath()); } } } @@ -198,30 +191,20 @@ @Override public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException { - String ts = getCurrentTimestamp(); - // Begin timestamp and end timestamp are identical since it is a flush - return new LSMComponentFileReferences(baseDir.getChild(ts + DELIMITER + ts), null, null); + final String sequence = getNextComponentSequence(COMPONENT_FILES_FILTER); + return new LSMComponentFileReferences(baseDir.getChild(sequence), null, null); } @Override - public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) - throws HyracksDataException { - String[] firstTimestampRange = firstFileName.split(DELIMITER); - String[] lastTimestampRange = lastFileName.split(DELIMITER); - String start = firstTimestampRange[0]; - String end = lastTimestampRange[1]; - if (end.compareTo(start) <= 0) { - throw new IllegalArgumentException( - "A Merge file must have end greater than start. Found end: " + end + " and start: " + start); - } - // Get the range of timestamps by taking the earliest and the latest timestamps - return new LSMComponentFileReferences(baseDir.getChild(start + DELIMITER + end), null, null); + public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) { + final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName); + return new LSMComponentFileReferences(baseDir.getChild(baseName), null, null); } @Override public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException { List<LSMComponentFileReferences> validFiles = new ArrayList<>(); - ArrayList<ComparableFileName> allFiles = new ArrayList<>(); + ArrayList<IndexComponentFileReference> allFiles = new ArrayList<>(); // Gather files and delete invalid files // There are two types of invalid files: @@ -235,40 +218,37 @@ } if (allFiles.size() == 1) { - validFiles.add(new LSMComponentFileReferences(allFiles.get(0).fileRef, null, null)); + validFiles.add(new LSMComponentFileReferences(allFiles.get(0).getFileRef(), null, null)); return validFiles; } - // Sorts files names from earliest to latest timestamp. + // Sorts files names from earliest to latest Collections.sort(allFiles); - List<ComparableFileName> validComparableFiles = new ArrayList<>(); - ComparableFileName last = allFiles.get(0); + List<IndexComponentFileReference> validComparableFiles = new ArrayList<>(); + IndexComponentFileReference last = allFiles.get(0); validComparableFiles.add(last); for (int i = 1; i < allFiles.size(); i++) { - ComparableFileName current = allFiles.get(i); - // The current start timestamp is greater than last stop timestamp so current is valid. - if (current.interval[0].compareTo(last.interval[1]) > 0) { + IndexComponentFileReference current = allFiles.get(i); + if (current.isMoreRecentThan(last)) { + // The current start sequence is greater than last stop sequence so current is valid. validComparableFiles.add(current); last = current; - } else if (current.interval[0].compareTo(last.interval[0]) >= 0 - && current.interval[1].compareTo(last.interval[1]) <= 0) { + } else if (current.isWithin(last)) { // The current file is completely contained in the interval of the // last file. Thus the last file must contain at least as much information // as the current file, so delete the current file. - delete(treeFactory.getBufferCache(), current.fullPath); + delete(treeFactory.getBufferCache(), current.getFullPath()); } else { // This scenario should not be possible since timestamps are monotonically increasing. throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir); } } - // Sort valid files in reverse lexicographical order, such that newer files come first. - Collections.sort(validComparableFiles, recencyCmp); - for (ComparableFileName cmpFileName : validComparableFiles) { - validFiles.add(new LSMComponentFileReferences(cmpFileName.fileRef, null, null)); + validComparableFiles.sort(recencyCmp); + for (IndexComponentFileReference cmpFileName : validComparableFiles) { + validFiles.add(new LSMComponentFileReferences(cmpFileName.getFileRef(), null, null)); } - return validFiles; } @@ -287,8 +267,7 @@ private static class FileNameComparator implements Comparator<String> { @Override public int compare(String a, String b) { - // Consciously ignoring locale. - return -a.compareTo(b); + return IndexComponentFileReference.of(b).compareTo(IndexComponentFileReference.of(a)); } } @@ -309,45 +288,14 @@ } } - protected class ComparableFileName implements Comparable<ComparableFileName> { - public final FileReference fileRef; - public final String fullPath; - public final String fileName; - - // Timestamp interval. - public final String[] interval; - - public ComparableFileName(FileReference fileRef) { - this.fileRef = fileRef; - this.fullPath = fileRef.getFile().getAbsolutePath(); - this.fileName = fileRef.getFile().getName(); - interval = fileName.split(DELIMITER); - } - + private class RecencyComparator implements Comparator<IndexComponentFileReference> { @Override - public int compareTo(ComparableFileName b) { - int startCmp = interval[0].compareTo(b.interval[0]); + public int compare(IndexComponentFileReference a, IndexComponentFileReference b) { + int startCmp = -Long.compare(a.getSequenceStart(), b.getSequenceStart()); if (startCmp != 0) { return startCmp; } - return b.interval[1].compareTo(interval[1]); - } - - @Override - public String toString() { - return "{\"type\" : \"" + (interval[0].equals(interval[1]) ? "flush" : "merge") + "\", \"start\" : \"" - + interval[0] + "\", \"end\" : \"" + interval[1] + "\"}"; - } - } - - private class RecencyComparator implements Comparator<ComparableFileName> { - @Override - public int compare(ComparableFileName a, ComparableFileName b) { - int cmp = -a.interval[0].compareTo(b.interval[0]); - if (cmp != 0) { - return cmp; - } - return -a.interval[1].compareTo(b.interval[1]); + return -Long.compare(a.getSequenceEnd(), b.getSequenceEnd()); } } @@ -382,10 +330,10 @@ return null; } - protected static FilenameFilter createTransactionFilter(String transactionFileName, final boolean inclusive) { + private static FilenameFilter createTransactionFilter(String transactionFileName, final boolean inclusive) { final String timeStamp = transactionFileName.substring(transactionFileName.indexOf(TXN_PREFIX) + TXN_PREFIX.length()); - return (dir, name) -> inclusive ? name.startsWith(timeStamp) : !name.startsWith(timeStamp); + return (dir, name) -> inclusive == name.startsWith(timeStamp); } protected FilenameFilter getTransactionFileFilter(boolean inclusive) throws HyracksDataException { @@ -406,34 +354,12 @@ return (dir, name) -> filter1.accept(dir, name) && filter2.accept(dir, name); } - /** - * @return The string format of the current timestamp. - * The returned results of this method are guaranteed to not have duplicates. - */ - protected String getCurrentTimestamp() { - Date date = new Date(); - String ts = formatter.format(date); - /** - * prevent a corner case where the same timestamp can be given. - */ - while (prevTimestamp != null && ts.compareTo(prevTimestamp) == 0) { - try { - Thread.sleep(1); - date = new Date(); - ts = formatter.format(date); - } catch (InterruptedException e) { - //ignore - } + protected String getNextComponentSequence(FilenameFilter filenameFilter) throws HyracksDataException { + long maxComponentSeq = -1; + final String[] files = listDirFiles(baseDir, filenameFilter); + for (String fileName : files) { + maxComponentSeq = Math.max(maxComponentSeq, IndexComponentFileReference.of(fileName).getSequenceEnd()); } - prevTimestamp = ts; - return ts; - } - - public static String getComponentStartTime(String fileName) { - return fileName.split(DELIMITER)[0]; - } - - public static String getComponentEndTime(String fileName) { - return fileName.split(DELIMITER)[1]; + return IndexComponentFileReference.getFlushSequence(maxComponentSeq + 1); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexComponentFileReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexComponentFileReference.java new file mode 100644 index 0000000..bbadf60 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/IndexComponentFileReference.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.impls; + +import static org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.DELIMITER; + +import java.util.Objects; + +import org.apache.hyracks.api.io.FileReference; + +public class IndexComponentFileReference implements Comparable<IndexComponentFileReference> { + + private FileReference fileRef; + private String fullPath; + private String fileName; + private long sequenceStart; + private long sequenceEnd; + + private IndexComponentFileReference() { + } + + public static IndexComponentFileReference of(String file) { + final IndexComponentFileReference ref = new IndexComponentFileReference(); + ref.fileName = file; + final String[] splits = file.split(DELIMITER); + ref.sequenceStart = Long.parseLong(splits[0]); + ref.sequenceEnd = Long.parseLong(splits[1]); + return ref; + } + + public static IndexComponentFileReference of(FileReference fileRef) { + final IndexComponentFileReference ref = of(fileRef.getFile().getName()); + ref.fileRef = fileRef; + ref.fullPath = fileRef.getFile().getAbsolutePath(); + return ref; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + IndexComponentFileReference that = (IndexComponentFileReference) o; + return Objects.equals(fileName, that.fileName); + } + + @Override + public int hashCode() { + return Objects.hash(fileName); + } + + @Override + public int compareTo(IndexComponentFileReference o) { + int startCmp = Long.compare(sequenceStart, o.sequenceStart); + if (startCmp != 0) { + return startCmp; + } + return Long.compare(o.sequenceEnd, sequenceEnd); + } + + public String getFileName() { + return fileName; + } + + public long getSequenceStart() { + return sequenceStart; + } + + public long getSequenceEnd() { + return sequenceEnd; + } + + public String getFullPath() { + return fullPath; + } + + public FileReference getFileRef() { + return fileRef; + } + + public String getSequence() { + return sequenceStart + DELIMITER + sequenceEnd; + } + + public boolean isMoreRecentThan(IndexComponentFileReference other) { + return sequenceStart > other.getSequenceEnd(); + } + + public boolean isWithin(IndexComponentFileReference other) { + return sequenceStart >= other.getSequenceStart() && sequenceEnd <= other.getSequenceEnd(); + } + + @Override + public String toString() { + return "{\"type\" : \"" + (isFlush() ? "flush" : "merge") + "\", \"start\" : \"" + sequenceStart + + "\", \"end\" : \"" + sequenceEnd + "\"}"; + } + + private boolean isFlush() { + return sequenceStart == sequenceEnd; + } + + public static String getFlushSequence(long componentSequence) { + return componentSequence + DELIMITER + componentSequence; + } + + public static String getMergeSequence(String firstComponentName, String lastComponentName) { + long mergeSequenceStart = IndexComponentFileReference.of(firstComponentName).getSequenceStart(); + long mergeSequenceEnd = IndexComponentFileReference.of(lastComponentName).getSequenceEnd(); + return mergeSequenceStart + DELIMITER + mergeSequenceEnd; + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java index c7990a6..cf6c4a2 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentId.java @@ -24,12 +24,14 @@ public class LSMComponentId implements ILSMComponentId { public static final long NOT_FOUND = -1; + public static final long MIN_VALID_COMPONENT_ID = 0; // Used to represent an empty index with no components public static final LSMComponentId EMPTY_INDEX_LAST_COMPONENT_ID = new LSMComponentId(NOT_FOUND, NOT_FOUND); // A default component id used for bulk loaded component - public static final LSMComponentId DEFAULT_COMPONENT_ID = new LSMComponentId(0, 0); + public static final LSMComponentId DEFAULT_COMPONENT_ID = + new LSMComponentId(MIN_VALID_COMPONENT_ID, MIN_VALID_COMPONENT_ID); private long minId; diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java index 3da57fd..e6bf0ab 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java @@ -21,55 +21,52 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; +import org.apache.hyracks.util.annotations.ThreadSafe; /** * A default implementation of {@link ILSMComponentIdGenerator}. - * */ +@ThreadSafe public class LSMComponentIdGenerator implements ILSMComponentIdGenerator { private final int numComponents; private int currentComponentIndex; - protected long previousTimestamp = -1L; + private long lastUsedId; private ILSMComponentId componentId; + private boolean initialized = false; public LSMComponentIdGenerator(int numComponents) { this.numComponents = numComponents; + } + + @Override + public synchronized void init(long lastUsedId) { + this.lastUsedId = lastUsedId; + initialized = true; refresh(); currentComponentIndex = 0; } @Override - public void refresh() { - long ts = getCurrentTimestamp(); - componentId = new LSMComponentId(ts, ts); + public synchronized void refresh() { + if (!initialized) { + throw new IllegalStateException("Attempt to refresh component id before initialziation."); + } + final long nextId = ++lastUsedId; + componentId = new LSMComponentId(nextId, nextId); currentComponentIndex = (currentComponentIndex + 1) % numComponents; } @Override - public ILSMComponentId getId() { + public synchronized ILSMComponentId getId() { + if (!initialized) { + throw new IllegalStateException("Attempt to get component id before initialziation."); + } return componentId; } @Override - public int getCurrentComponentIndex() { + public synchronized int getCurrentComponentIndex() { return currentComponentIndex; } - - protected long getCurrentTimestamp() { - long timestamp = System.currentTimeMillis(); - while (timestamp <= previousTimestamp) { - // make sure timestamp is strictly increasing - try { - Thread.sleep(1); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - timestamp = System.currentTimeMillis(); - } - previousTimestamp = timestamp; - return timestamp; - - } - } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java index 2f1eb87..4471102 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexFileManager.java @@ -32,6 +32,7 @@ import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; import org.apache.hyracks.storage.am.lsm.common.impls.BTreeFactory; +import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexFileNameMapper; @@ -57,21 +58,15 @@ @Override public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException { - String ts = getCurrentTimestamp(); - String baseName = ts + DELIMITER + ts; - // Begin timestamp and end timestamp are identical since it is a flush + String baseName = getNextComponentSequence(deletedKeysBTreeFilter); return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + DICT_BTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + DELETED_KEYS_BTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX)); } @Override - public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) - throws HyracksDataException { - String[] firstTimestampRange = firstFileName.split(DELIMITER); - String[] lastTimestampRange = lastFileName.split(DELIMITER); - String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1]; - // Get the range of timestamps by taking the earliest and the latest timestamps + public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) { + final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName); return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + DICT_BTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + DELETED_KEYS_BTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX)); @@ -80,18 +75,17 @@ @Override public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException { List<LSMComponentFileReferences> validFiles = new ArrayList<>(); - ArrayList<ComparableFileName> allDictBTreeFiles = new ArrayList<>(); - ArrayList<ComparableFileName> allInvListsFiles = new ArrayList<>(); - ArrayList<ComparableFileName> allDeletedKeysBTreeFiles = new ArrayList<>(); - ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>(); + ArrayList<IndexComponentFileReference> allDictBTreeFiles = new ArrayList<>(); + ArrayList<IndexComponentFileReference> allInvListsFiles = new ArrayList<>(); + ArrayList<IndexComponentFileReference> allDeletedKeysBTreeFiles = new ArrayList<>(); + ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>(); // Gather files. cleanupAndGetValidFilesInternal(deletedKeysBTreeFilter, btreeFactory, allDeletedKeysBTreeFiles, btreeFactory.getBufferCache()); HashSet<String> deletedKeysBTreeFilesSet = new HashSet<>(); - for (ComparableFileName cmpFileName : allDeletedKeysBTreeFiles) { - int index = cmpFileName.fileName.lastIndexOf(DELIMITER); - deletedKeysBTreeFilesSet.add(cmpFileName.fileName.substring(0, index)); + for (IndexComponentFileReference cmpFileName : allDeletedKeysBTreeFiles) { + deletedKeysBTreeFilesSet.add(cmpFileName.getSequence()); } // TODO: do we really need to validate the inverted lists files or is validating the dict. BTrees is enough? @@ -116,52 +110,48 @@ if (allDictBTreeFiles.size() == 1 && allInvListsFiles.size() == 1 && allDeletedKeysBTreeFiles.size() == 1 && allBloomFilterFiles.size() == 1) { - validFiles.add(new LSMComponentFileReferences(allDictBTreeFiles.get(0).fileRef, - allDeletedKeysBTreeFiles.get(0).fileRef, allBloomFilterFiles.get(0).fileRef)); + validFiles.add(new LSMComponentFileReferences(allDictBTreeFiles.get(0).getFileRef(), + allDeletedKeysBTreeFiles.get(0).getFileRef(), allBloomFilterFiles.get(0).getFileRef())); return validFiles; } - // Sorts files names from earliest to latest timestamp. + // Sorts files names from earliest to latest sequence. Collections.sort(allDeletedKeysBTreeFiles); Collections.sort(allDictBTreeFiles); Collections.sort(allBloomFilterFiles); - List<ComparableFileName> validComparableDictBTreeFiles = new ArrayList<>(); - ComparableFileName lastDictBTree = allDictBTreeFiles.get(0); + List<IndexComponentFileReference> validComparableDictBTreeFiles = new ArrayList<>(); + IndexComponentFileReference lastDictBTree = allDictBTreeFiles.get(0); validComparableDictBTreeFiles.add(lastDictBTree); - List<ComparableFileName> validComparableDeletedKeysBTreeFiles = new ArrayList<>(); - ComparableFileName lastDeletedKeysBTree = allDeletedKeysBTreeFiles.get(0); + List<IndexComponentFileReference> validComparableDeletedKeysBTreeFiles = new ArrayList<>(); + IndexComponentFileReference lastDeletedKeysBTree = allDeletedKeysBTreeFiles.get(0); validComparableDeletedKeysBTreeFiles.add(lastDeletedKeysBTree); - List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<>(); - ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0); + List<IndexComponentFileReference> validComparableBloomFilterFiles = new ArrayList<>(); + IndexComponentFileReference lastBloomFilter = allBloomFilterFiles.get(0); validComparableBloomFilterFiles.add(lastBloomFilter); for (int i = 1; i < allDictBTreeFiles.size(); i++) { - ComparableFileName currentDeletedKeysBTree = allDeletedKeysBTreeFiles.get(i); - ComparableFileName currentDictBTree = allDictBTreeFiles.get(i); - ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i); - // Current start timestamp is greater than last stop timestamp. - if (currentDeletedKeysBTree.interval[0].compareTo(lastDeletedKeysBTree.interval[1]) > 0 - && currentDictBTree.interval[0].compareTo(lastDictBTree.interval[1]) > 0 - && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) { + IndexComponentFileReference currentDeletedKeysBTree = allDeletedKeysBTreeFiles.get(i); + IndexComponentFileReference currentDictBTree = allDictBTreeFiles.get(i); + IndexComponentFileReference currentBloomFilter = allBloomFilterFiles.get(i); + // Current start sequence is greater than last stop sequence. + if (currentDeletedKeysBTree.isMoreRecentThan(lastDeletedKeysBTree) + && currentDictBTree.isMoreRecentThan(lastDictBTree) + && currentBloomFilter.isMoreRecentThan(lastBloomFilter)) { validComparableDictBTreeFiles.add(currentDictBTree); validComparableDeletedKeysBTreeFiles.add(currentDeletedKeysBTree); validComparableBloomFilterFiles.add(currentBloomFilter); lastDictBTree = currentDictBTree; lastDeletedKeysBTree = currentDeletedKeysBTree; lastBloomFilter = currentBloomFilter; - } else if (currentDeletedKeysBTree.interval[0].compareTo(lastDeletedKeysBTree.interval[0]) >= 0 - && currentDeletedKeysBTree.interval[1].compareTo(lastDeletedKeysBTree.interval[1]) <= 0 - && currentDictBTree.interval[0].compareTo(lastDictBTree.interval[0]) >= 0 - && currentDictBTree.interval[1].compareTo(lastDictBTree.interval[1]) <= 0 - && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0 - && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) { - // Invalid files are completely contained in last interval. - delete(treeFactory.getBufferCache(), currentDeletedKeysBTree.fullPath); - delete(treeFactory.getBufferCache(), currentDictBTree.fullPath); - delete(treeFactory.getBufferCache(), currentBloomFilter.fullPath); + } else if (currentDeletedKeysBTree.isWithin(lastDeletedKeysBTree) + && currentDictBTree.isWithin(lastDictBTree) && currentBloomFilter.isWithin(lastBloomFilter)) { + // Invalid files are completely contained in last sequence. + delete(treeFactory.getBufferCache(), currentDeletedKeysBTree.getFullPath()); + delete(treeFactory.getBufferCache(), currentDictBTree.getFullPath()); + delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath()); } else { // This scenario should not be possible. throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir); @@ -170,21 +160,20 @@ // Sort valid files in reverse lexicographical order, such that newer // files come first. - Collections.sort(validComparableDictBTreeFiles, recencyCmp); - Collections.sort(validComparableDeletedKeysBTreeFiles, recencyCmp); - Collections.sort(validComparableBloomFilterFiles, recencyCmp); + validComparableDictBTreeFiles.sort(recencyCmp); + validComparableDeletedKeysBTreeFiles.sort(recencyCmp); + validComparableBloomFilterFiles.sort(recencyCmp); - Iterator<ComparableFileName> dictBTreeFileIter = validComparableDictBTreeFiles.iterator(); - Iterator<ComparableFileName> deletedKeysBTreeIter = validComparableDeletedKeysBTreeFiles.iterator(); - Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator(); + Iterator<IndexComponentFileReference> dictBTreeFileIter = validComparableDictBTreeFiles.iterator(); + Iterator<IndexComponentFileReference> deletedKeysBTreeIter = validComparableDeletedKeysBTreeFiles.iterator(); + Iterator<IndexComponentFileReference> bloomFilterFileIter = validComparableBloomFilterFiles.iterator(); while (dictBTreeFileIter.hasNext() && deletedKeysBTreeIter.hasNext()) { - ComparableFileName cmpDictBTreeFile = dictBTreeFileIter.next(); - ComparableFileName cmpDeletedKeysBTreeFile = deletedKeysBTreeIter.next(); - ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next(); - validFiles.add(new LSMComponentFileReferences(cmpDictBTreeFile.fileRef, cmpDeletedKeysBTreeFile.fileRef, - cmpBloomFilterFileName.fileRef)); + IndexComponentFileReference cmpDictBTreeFile = dictBTreeFileIter.next(); + IndexComponentFileReference cmpDeletedKeysBTreeFile = deletedKeysBTreeIter.next(); + IndexComponentFileReference cmpBloomFilterFileName = bloomFilterFileIter.next(); + validFiles.add(new LSMComponentFileReferences(cmpDictBTreeFile.getFileRef(), + cmpDeletedKeysBTreeFile.getFileRef(), cmpBloomFilterFileName.getFileRef())); } - return validFiles; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java index 2512776..3348407 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeFileManager.java @@ -36,6 +36,7 @@ import org.apache.hyracks.api.util.IoUtil; import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; +import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference; import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory; @@ -58,22 +59,15 @@ @Override public LSMComponentFileReferences getRelFlushFileReference() throws HyracksDataException { - String ts = getCurrentTimestamp(); - String baseName = ts + DELIMITER + ts; - // Begin timestamp and end timestamp are identical since it is a flush + String baseName = getNextComponentSequence(btreeFilter); return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + RTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX)); } @Override - public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) - throws HyracksDataException { - String[] firstTimestampRange = firstFileName.split(DELIMITER); - String[] lastTimestampRange = lastFileName.split(DELIMITER); - String baseName = firstTimestampRange[0] + DELIMITER + lastTimestampRange[1]; - // Get the range of timestamps by taking the earliest and the latest - // timestamps + public LSMComponentFileReferences getRelMergeFileReference(String firstFileName, String lastFileName) { + final String baseName = IndexComponentFileReference.getMergeSequence(firstFileName, lastFileName); return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + RTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX)); @@ -82,9 +76,9 @@ @Override public List<LSMComponentFileReferences> cleanupAndGetValidFiles() throws HyracksDataException { List<LSMComponentFileReferences> validFiles = new ArrayList<>(); - ArrayList<ComparableFileName> allRTreeFiles = new ArrayList<>(); - ArrayList<ComparableFileName> allBTreeFiles = new ArrayList<>(); - ArrayList<ComparableFileName> allBloomFilterFiles = new ArrayList<>(); + ArrayList<IndexComponentFileReference> allRTreeFiles = new ArrayList<>(); + ArrayList<IndexComponentFileReference> allBTreeFiles = new ArrayList<>(); + ArrayList<IndexComponentFileReference> allBloomFilterFiles = new ArrayList<>(); // Create a transaction filter <- to hide transaction components-> FilenameFilter transactionFilter = getTransactionFileFilter(false); @@ -93,9 +87,8 @@ cleanupAndGetValidFilesInternal(getCompoundFilter(transactionFilter, btreeFilter), btreeFactory, allBTreeFiles, btreeFactory.getBufferCache()); HashSet<String> btreeFilesSet = new HashSet<>(); - for (ComparableFileName cmpFileName : allBTreeFiles) { - int index = cmpFileName.fileName.lastIndexOf(DELIMITER); - btreeFilesSet.add(cmpFileName.fileName.substring(0, index)); + for (IndexComponentFileReference cmpFileName : allBTreeFiles) { + btreeFilesSet.add(cmpFileName.getSequence()); } validateFiles(btreeFilesSet, allRTreeFiles, getCompoundFilter(transactionFilter, rtreeFilter), rtreeFactory, btreeFactory.getBufferCache()); @@ -113,52 +106,47 @@ } if (allRTreeFiles.size() == 1 && allBTreeFiles.size() == 1 && allBloomFilterFiles.size() == 1) { - validFiles.add(new LSMComponentFileReferences(allRTreeFiles.get(0).fileRef, allBTreeFiles.get(0).fileRef, - allBloomFilterFiles.get(0).fileRef)); + validFiles.add(new LSMComponentFileReferences(allRTreeFiles.get(0).getFileRef(), + allBTreeFiles.get(0).getFileRef(), allBloomFilterFiles.get(0).getFileRef())); return validFiles; } - // Sorts files names from earliest to latest timestamp. + // Sorts files names from earliest to latest sequence. Collections.sort(allRTreeFiles); Collections.sort(allBTreeFiles); Collections.sort(allBloomFilterFiles); - List<ComparableFileName> validComparableRTreeFiles = new ArrayList<>(); - ComparableFileName lastRTree = allRTreeFiles.get(0); + List<IndexComponentFileReference> validComparableRTreeFiles = new ArrayList<>(); + IndexComponentFileReference lastRTree = allRTreeFiles.get(0); validComparableRTreeFiles.add(lastRTree); - List<ComparableFileName> validComparableBTreeFiles = new ArrayList<>(); - ComparableFileName lastBTree = allBTreeFiles.get(0); + List<IndexComponentFileReference> validComparableBTreeFiles = new ArrayList<>(); + IndexComponentFileReference lastBTree = allBTreeFiles.get(0); validComparableBTreeFiles.add(lastBTree); - List<ComparableFileName> validComparableBloomFilterFiles = new ArrayList<>(); - ComparableFileName lastBloomFilter = allBloomFilterFiles.get(0); + List<IndexComponentFileReference> validComparableBloomFilterFiles = new ArrayList<>(); + IndexComponentFileReference lastBloomFilter = allBloomFilterFiles.get(0); validComparableBloomFilterFiles.add(lastBloomFilter); for (int i = 1; i < allRTreeFiles.size(); i++) { - ComparableFileName currentRTree = allRTreeFiles.get(i); - ComparableFileName currentBTree = allBTreeFiles.get(i); - ComparableFileName currentBloomFilter = allBloomFilterFiles.get(i); - // Current start timestamp is greater than last stop timestamp. - if (currentRTree.interval[0].compareTo(lastRTree.interval[1]) > 0 - && currentBTree.interval[0].compareTo(lastBTree.interval[1]) > 0 - && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[1]) > 0) { + IndexComponentFileReference currentRTree = allRTreeFiles.get(i); + IndexComponentFileReference currentBTree = allBTreeFiles.get(i); + IndexComponentFileReference currentBloomFilter = allBloomFilterFiles.get(i); + // Current start sequence is greater than last stop sequence. + if (currentRTree.isMoreRecentThan(lastRTree) && currentBTree.isMoreRecentThan(lastBTree) + && currentBloomFilter.isMoreRecentThan(lastBloomFilter)) { validComparableRTreeFiles.add(currentRTree); validComparableBTreeFiles.add(currentBTree); validComparableBloomFilterFiles.add(currentBloomFilter); lastRTree = currentRTree; lastBTree = currentBTree; lastBloomFilter = currentBloomFilter; - } else if (currentRTree.interval[0].compareTo(lastRTree.interval[0]) >= 0 - && currentRTree.interval[1].compareTo(lastRTree.interval[1]) <= 0 - && currentBTree.interval[0].compareTo(lastBTree.interval[0]) >= 0 - && currentBTree.interval[1].compareTo(lastBTree.interval[1]) <= 0 - && currentBloomFilter.interval[0].compareTo(lastBloomFilter.interval[0]) >= 0 - && currentBloomFilter.interval[1].compareTo(lastBloomFilter.interval[1]) <= 0) { - // Invalid files are completely contained in last interval. - delete(treeFactory.getBufferCache(), currentRTree.fullPath); - delete(treeFactory.getBufferCache(), currentBTree.fullPath); - delete(treeFactory.getBufferCache(), currentBloomFilter.fullPath); + } else if (currentRTree.isWithin(lastRTree) && currentBTree.isWithin(lastBTree) + && currentBloomFilter.isWithin(lastBloomFilter)) { + // Invalid files are completely contained in last sequence. + delete(treeFactory.getBufferCache(), currentRTree.getFullPath()); + delete(treeFactory.getBufferCache(), currentBTree.getFullPath()); + delete(treeFactory.getBufferCache(), currentBloomFilter.getFullPath()); } else { // This scenario should not be possible. throw HyracksDataException.create(ErrorCode.FOUND_OVERLAPPING_LSM_FILES, baseDir); @@ -167,29 +155,28 @@ // Sort valid files in reverse lexicographical order, such that newer // files come first. - Collections.sort(validComparableRTreeFiles, recencyCmp); - Collections.sort(validComparableBTreeFiles, recencyCmp); - Collections.sort(validComparableBloomFilterFiles, recencyCmp); + validComparableRTreeFiles.sort(recencyCmp); + validComparableBTreeFiles.sort(recencyCmp); + validComparableBloomFilterFiles.sort(recencyCmp); - Iterator<ComparableFileName> rtreeFileIter = validComparableRTreeFiles.iterator(); - Iterator<ComparableFileName> btreeFileIter = validComparableBTreeFiles.iterator(); - Iterator<ComparableFileName> bloomFilterFileIter = validComparableBloomFilterFiles.iterator(); + Iterator<IndexComponentFileReference> rtreeFileIter = validComparableRTreeFiles.iterator(); + Iterator<IndexComponentFileReference> btreeFileIter = validComparableBTreeFiles.iterator(); + Iterator<IndexComponentFileReference> bloomFilterFileIter = validComparableBloomFilterFiles.iterator(); while (rtreeFileIter.hasNext() && btreeFileIter.hasNext()) { - ComparableFileName cmpRTreeFileName = rtreeFileIter.next(); - ComparableFileName cmpBTreeFileName = btreeFileIter.next(); - ComparableFileName cmpBloomFilterFileName = bloomFilterFileIter.next(); - validFiles.add(new LSMComponentFileReferences(cmpRTreeFileName.fileRef, cmpBTreeFileName.fileRef, - cmpBloomFilterFileName.fileRef)); + IndexComponentFileReference cmpRTreeFileName = rtreeFileIter.next(); + IndexComponentFileReference cmpBTreeFileName = btreeFileIter.next(); + IndexComponentFileReference cmpBloomFilterFileName = bloomFilterFileIter.next(); + validFiles.add(new LSMComponentFileReferences(cmpRTreeFileName.getFileRef(), cmpBTreeFileName.getFileRef(), + cmpBloomFilterFileName.getFileRef())); } return validFiles; } @Override public LSMComponentFileReferences getNewTransactionFileReference() throws IOException { - String ts = getCurrentTimestamp(); + String baseName = getNextComponentSequence(btreeFilter); // Create transaction lock file - Files.createFile(Paths.get(baseDir + TXN_PREFIX + ts)); - String baseName = ts + DELIMITER + ts; + Files.createFile(Paths.get(baseDir + TXN_PREFIX + baseName)); return new LSMComponentFileReferences(baseDir.getChild(baseName + DELIMITER + RTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + BTREE_SUFFIX), baseDir.getChild(baseName + DELIMITER + BLOOM_FILTER_SUFFIX)); diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java index 8ecbcc4..c255ee5 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-common-test/src/test/java/org/apache/hyracks/storage/am/lsm/common/component/TestLsmIndexFileManager.java @@ -22,15 +22,18 @@ import java.io.FilenameFilter; import java.util.ArrayList; -import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager; +import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference; +import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentFileReferences; import org.apache.hyracks.storage.am.lsm.common.impls.TreeIndexFactory; import org.apache.hyracks.storage.common.buffercache.IBufferCache; public class TestLsmIndexFileManager extends AbstractLSMIndexFileManager { + + private long componentSeq = 0; public TestLsmIndexFileManager(IIOManager ioManager, FileReference file, TreeIndexFactory<? extends ITreeIndex> treeIndexFactory) { @@ -39,12 +42,18 @@ @Override protected void cleanupAndGetValidFilesInternal(FilenameFilter filter, - TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<ComparableFileName> allFiles, - IBufferCache bufferCache) throws HyracksDataException { + TreeIndexFactory<? extends ITreeIndex> treeFactory, ArrayList<IndexComponentFileReference> allFiles, + IBufferCache bufferCache) { String[] files = baseDir.getFile().list(filter); for (String fileName : files) { FileReference fileRef = baseDir.getChild(fileName); - allFiles.add(new ComparableFileName(fileRef)); + allFiles.add(IndexComponentFileReference.of(fileRef)); } } + + @Override + public LSMComponentFileReferences getRelFlushFileReference() { + String sequence = IndexComponentFileReference.getFlushSequence(componentSeq++); + return new LSMComponentFileReferences(baseDir.getChild(sequence), null, null); + } } -- To view, visit https://asterix-gerrit.ics.uci.edu/2927 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I9dff8ffb38ce8064a199d03b070ed1f5b924b8a4 Gerrit-PatchSet: 10 Gerrit-Project: asterixdb Gerrit-Branch: stabilization-f69489 Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Luo Chen <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
