ibessonov commented on code in PR #7610:
URL: https://github.com/apache/ignite-3/pull/7610#discussion_r2839570152
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java:
##########
@@ -92,6 +95,12 @@
* @see SegmentFileManager
*/
class IndexFileManager {
+ /**
+ * Maximum number of times we try to read data from a segment file
returned based the index before giving up and throwing an
Review Comment:
I don't understand this sentence. `a segment file returned based the index`
- are there words missing?
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static java.lang.Math.toIntExact;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static
org.apache.ignite.internal.raft.storage.segstore.IndexFileManager.indexFileProperties;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.HEADER_RECORD;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.CRC_SIZE_BYTES;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayloadParser.endOfSegmentReached;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayloadParser.validateSegmentFileHeader;
+import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import
org.apache.ignite.internal.raft.storage.segstore.GroupInfoProvider.GroupInfo;
+import org.apache.ignite.internal.raft.util.VarlenEncoder;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Garbage Collector for Raft log segment files.
+ *
+ * <p>The garbage collector performs compaction of segment files by removing
truncated log entries and creating new generations
+ * of segment files. This process reclaims disk space occupied by log entries
that have been truncated via
+ * {@link LogStorage#truncatePrefix} or {@link LogStorage#truncateSuffix}
operations.
+ *
+ * <h2>Compaction Process</h2>
+ * When a segment file is selected for compaction, the GC:
+ * <ol>
+ * <li>Copies non-truncated entries to a new segment file with an
incremented generation number</li>
+ * <li>Creates a new index file for the new generation</li>
+ * <li>Atomically replaces the old segment file with the new one</li>
+ * <li>Deletes the old segment file and its index file</li>
+ * </ol>
+ */
+class RaftLogGarbageCollector {
+ private static final IgniteLogger LOG =
Loggers.forClass(RaftLogGarbageCollector.class);
+
+ private static final String TMP_FILE_SUFFIX = ".tmp";
+
+ private final Path segmentFilesDir;
+
+ private final IndexFileManager indexFileManager;
+
+ private final GroupInfoProvider groupInfoProvider;
+
+ private final AtomicLong logSize = new AtomicLong();
+
+ RaftLogGarbageCollector(
+ Path segmentFilesDir,
+ IndexFileManager indexFileManager,
+ GroupInfoProvider groupInfoProvider
+ ) {
+ this.segmentFilesDir = segmentFilesDir;
+ this.indexFileManager = indexFileManager;
+ this.groupInfoProvider = groupInfoProvider;
+ }
+
+ void cleanupLeftoverFiles() throws IOException {
+ FileProperties prevFileProperties = null;
+
+ try (Stream<Path> segmentFiles = Files.list(segmentFilesDir)) {
+ Iterator<Path> it = segmentFiles.sorted().iterator();
+
+ while (it.hasNext()) {
+ Path segmentFile = it.next();
+
+ if
(segmentFile.getFileName().toString().endsWith(TMP_FILE_SUFFIX)) {
+ LOG.info("Deleting temporary segment file: {}.",
segmentFile);
+
+ Files.delete(segmentFile);
+ } else {
+ FileProperties fileProperties =
SegmentFile.fileProperties(segmentFile);
+
+ if (prevFileProperties != null &&
prevFileProperties.ordinal() == fileProperties.ordinal()) {
+ Path prevPath =
segmentFilesDir.resolve(SegmentFile.fileName(prevFileProperties));
+
+ LOG.info("Deleting segment file {} because it has a
higher generation version.", prevPath);
+
+ Files.delete(prevPath);
+ }
+
+ prevFileProperties = fileProperties;
+ }
+ }
+ }
+
+ // Do the same routine but for index files.
+ prevFileProperties = null;
+
+ try (Stream<Path> indexFiles =
Files.list(indexFileManager.indexFilesDir())) {
+ Iterator<Path> it = indexFiles.sorted().iterator();
+
+ while (it.hasNext()) {
+ Path indexFile = it.next();
+
+ FileProperties fileProperties = indexFileProperties(indexFile);
+
+ // Temporary index fils are not created by the GC, they are
created by the index manager and are cleaned up by it.
Review Comment:
Should we also delete index files that have no corresponding segment files?
I'm afraid we may have a leak here, in theory
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java:
##########
@@ -207,15 +223,47 @@ private Path saveIndexMemtable(
}
}
- return syncAndRename(tmpFilePath,
tmpFilePath.resolveSibling(fileName));
+ syncAndRename(tmpFilePath, tmpFilePath.resolveSibling(fileName));
+
+ return fileHeaderWithIndexMetas.indexMetas();
}
/**
* This method is intended to be called during {@link SegmentFileManager}
recovery in order to create index files that may have been
* lost due to a component stop before a checkpoint was able to complete.
*/
void recoverIndexFile(ReadModeIndexMemTable indexMemTable, FileProperties
fileProperties) throws IOException {
- saveIndexMemtable(indexMemTable, fileProperties, true);
+ // On recovery we are only creating missing index files, in-memory
meta will be created on Index File Manager start.
+ // (see recoverIndexFileMetas).
+ saveIndexMemtable(indexFilePath(fileProperties), indexMemTable,
fileProperties);
+ }
+
+ Path onIndexFileCompacted(
+ ReadModeIndexMemTable indexMemTable,
+ FileProperties oldIndexFileProperties,
+ FileProperties newIndexFileProperties
+ ) throws IOException {
+ Path newIndexFilePath = indexFilePath(newIndexFileProperties);
+
+ List<IndexMetaSpec> metaSpecs = saveIndexMemtable(newIndexFilePath,
indexMemTable, newIndexFileProperties);
+
+ metaSpecs.forEach(metaSpec -> {
+ GroupIndexMeta groupIndexMeta =
groupIndexMetas.get(metaSpec.groupId);
+
+ IndexFileMeta meta = metaSpec.indexFileMeta();
+
+ if (groupIndexMeta != null && meta != null) {
+ groupIndexMeta.onIndexCompacted(oldIndexFileProperties, meta);
+ }
+ });
+
+ LOG.info("New index file created after compaction: {}.",
newIndexFilePath);
Review Comment:
I think that we should use `Foo [key=value]` format according to our
guidelines. I know that this rule is violated everywhere, but if it's possible
to change new code according to this rule then I would appreciate it.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java:
##########
@@ -163,4 +163,57 @@ firstLogIndexKept, firstLogIndexInclusive(),
lastLogIndexExclusive()
return new IndexFileMetaArray(newArray, newSize);
}
+
+ IndexFileMetaArray onIndexCompacted(FileProperties oldProperties,
IndexFileMeta newMeta) {
+ // Find index meta associated with the file being compacted.
+ int updateIndex = -1;
+
+ for (int i = 0; i < size; i++) {
+ if (array[i].indexFileProperties().equals(oldProperties)) {
+ updateIndex = i;
+ break;
Review Comment:
Is it possible to go directly to `i` without iteration? Must be computable,
I'm pretty sure
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java:
##########
@@ -238,14 +291,14 @@ SegmentFilePointer getSegmentFilePointer(long groupId,
long logIndex) throws IOE
Path indexFile =
indexFilesDir.resolve(indexFileName(indexFileMeta.indexFileProperties()));
- // Index file payload is a 0-based array, which indices correspond to
the [fileMeta.firstLogIndex, fileMeta.lastLogIndex) range.
- long payloadArrayIndex = logIndex -
indexFileMeta.firstLogIndexInclusive();
+ try (SeekableByteChannel channel = Files.newByteChannel(indexFile,
StandardOpenOption.READ)) {
Review Comment:
Are we doing it on every read? Seems rather expensive, opening a file takes
longer than actually reading from it I believe. Are there plans to reuse opened
channels? Maybe also make them accessible as `mmap` buffers too, it would
reduce the number of allocations.
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java:
##########
@@ -204,10 +214,10 @@ void start() throws IOException {
// Create missing index files.
FileProperties segmentFileProperties =
SegmentFile.fileProperties(segmentFilePath);
- if
(!indexFileManager.indexFileExists(segmentFileProperties)) {
+ if
(!Files.exists(indexFileManager.indexFilePath(segmentFileProperties))) {
Review Comment:
A note for the future: this looks like one of the place that we may need to
optimize in the future. Having a large number of file system accesses can be
expensive, and I'd rather see a single `dir` call and then a lookup into the
set of files. Not now, of course. Just keep in mind that we may have hundreds
of files in these directories
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java:
##########
@@ -96,6 +96,12 @@ class SegmentFileManager implements ManuallyCloseable {
private static final int ROLLOVER_WAIT_TIMEOUT_MS = 30_000;
+ /**
+ * Maximum number of times we try to read data from a segment file
returned based the index before giving up and throwing an
+ * exception. See {@link #readFromOtherSegmentFiles} for more information.
+ */
+ private static final int MAX_NUM_INDEX_FILE_READ_RETRIES = 5;
Review Comment:
I didn't ask the last time. Why do we need to have attempts? Isn't there a
"happens before"? If old segment/index is deleted then new segment/index is
already there, should be safe to just read it. If it's also deleted for some
reason, then we can do it indefinitely I guess, there's no way GC is so fast
and will just increment generation of the same file over and over again
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileManager.java:
##########
@@ -370,7 +430,13 @@ private byte[]
serializeHeaderAndFillMetadata(ReadModeIndexMemTable indexMemTabl
return new IndexFileMeta(firstIndexKept, lastLogIndexExclusive,
adjustedPayloadOffset, fileProperties);
}
- private void putIndexFileMeta(Long groupId, @Nullable IndexFileMeta
indexFileMeta, long firstIndexKept) {
+ private void putIndexFileMeta(IndexMetaSpec metaSpec) {
+ IndexFileMeta indexFileMeta = metaSpec.indexFileMeta();
+
+ Long groupId = metaSpec.groupId();
Review Comment:
I wonder why it's `Long` instead of `long`, could you please clarify this
moment? It's not even nullable, as far as I see
##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollector.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static java.lang.Math.toIntExact;
+import static java.nio.file.StandardOpenOption.CREATE_NEW;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static
org.apache.ignite.internal.raft.storage.segstore.IndexFileManager.indexFileProperties;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.HEADER_RECORD;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayload.CRC_SIZE_BYTES;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayloadParser.endOfSegmentReached;
+import static
org.apache.ignite.internal.raft.storage.segstore.SegmentPayloadParser.validateSegmentFileHeader;
+import static org.apache.ignite.internal.util.IgniteUtils.atomicMoveFile;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import
org.apache.ignite.internal.raft.storage.segstore.GroupInfoProvider.GroupInfo;
+import org.apache.ignite.internal.raft.util.VarlenEncoder;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Garbage Collector for Raft log segment files.
+ *
+ * <p>The garbage collector performs compaction of segment files by removing
truncated log entries and creating new generations
+ * of segment files. This process reclaims disk space occupied by log entries
that have been truncated via
+ * {@link LogStorage#truncatePrefix} or {@link LogStorage#truncateSuffix}
operations.
+ *
+ * <h2>Compaction Process</h2>
+ * When a segment file is selected for compaction, the GC:
+ * <ol>
+ * <li>Copies non-truncated entries to a new segment file with an
incremented generation number</li>
+ * <li>Creates a new index file for the new generation</li>
+ * <li>Atomically replaces the old segment file with the new one</li>
+ * <li>Deletes the old segment file and its index file</li>
+ * </ol>
+ */
+class RaftLogGarbageCollector {
+ private static final IgniteLogger LOG =
Loggers.forClass(RaftLogGarbageCollector.class);
+
+ private static final String TMP_FILE_SUFFIX = ".tmp";
+
+ private final Path segmentFilesDir;
+
+ private final IndexFileManager indexFileManager;
+
+ private final GroupInfoProvider groupInfoProvider;
+
+ private final AtomicLong logSize = new AtomicLong();
+
+ RaftLogGarbageCollector(
+ Path segmentFilesDir,
+ IndexFileManager indexFileManager,
+ GroupInfoProvider groupInfoProvider
+ ) {
+ this.segmentFilesDir = segmentFilesDir;
+ this.indexFileManager = indexFileManager;
+ this.groupInfoProvider = groupInfoProvider;
+ }
+
+ void cleanupLeftoverFiles() throws IOException {
+ FileProperties prevFileProperties = null;
+
+ try (Stream<Path> segmentFiles = Files.list(segmentFilesDir)) {
+ Iterator<Path> it = segmentFiles.sorted().iterator();
+
+ while (it.hasNext()) {
+ Path segmentFile = it.next();
+
+ if
(segmentFile.getFileName().toString().endsWith(TMP_FILE_SUFFIX)) {
+ LOG.info("Deleting temporary segment file: {}.",
segmentFile);
+
+ Files.delete(segmentFile);
+ } else {
+ FileProperties fileProperties =
SegmentFile.fileProperties(segmentFile);
+
+ if (prevFileProperties != null &&
prevFileProperties.ordinal() == fileProperties.ordinal()) {
+ Path prevPath =
segmentFilesDir.resolve(SegmentFile.fileName(prevFileProperties));
+
+ LOG.info("Deleting segment file {} because it has a
higher generation version.", prevPath);
+
+ Files.delete(prevPath);
+ }
+
+ prevFileProperties = fileProperties;
+ }
+ }
+ }
+
+ // Do the same routine but for index files.
+ prevFileProperties = null;
+
+ try (Stream<Path> indexFiles =
Files.list(indexFileManager.indexFilesDir())) {
+ Iterator<Path> it = indexFiles.sorted().iterator();
+
+ while (it.hasNext()) {
+ Path indexFile = it.next();
+
+ FileProperties fileProperties = indexFileProperties(indexFile);
+
+ // Temporary index fils are not created by the GC, they are
created by the index manager and are cleaned up by it.
+ if
(!Files.exists(segmentFilesDir.resolve(SegmentFile.fileName(fileProperties)))) {
+ LOG.info("Deleting index file {} because the corresponding
segment file does not exist.", indexFile);
+
+ Files.delete(indexFile);
+ } else if (prevFileProperties != null &&
prevFileProperties.ordinal() == fileProperties.ordinal()) {
+ Path prevPath =
indexFileManager.indexFilePath(prevFileProperties);
+
+ LOG.info("Deleting index file {} because it has a higher
generation version.", prevPath);
+
+ Files.deleteIfExists(prevPath);
+ }
+
+ prevFileProperties = fileProperties;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void compactSegmentFile(SegmentFile segmentFile) throws IOException {
Review Comment:
I'm in the middle of the review, so I don't yet know what will be next. Bu,
I have a question - is it possible in current architecture to detect that a
segment file has no useful data and just delete it, without opening it for
reading?
##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java:
##########
@@ -277,4 +279,145 @@ void
testTruncatePrefixRemovesAllEntriesWhenKeptAfterLast() {
assertThat(groupMeta.indexMeta(19), is(nullValue()));
assertThat(groupMeta.firstLogIndexInclusive(), is(-1L));
}
+
+ @Test
+ void testOnIndexCompacted() {
+ var meta1 = new IndexFileMeta(1, 50, 0, new FileProperties(0));
+ var meta2 = new IndexFileMeta(50, 100, 42, new FileProperties(1));
+ var meta3 = new IndexFileMeta(100, 150, 84, new FileProperties(2));
+
+ var groupMeta = new GroupIndexMeta(meta1);
+ groupMeta.addIndexMeta(meta2);
+ groupMeta.addIndexMeta(meta3);
+
+ var compactedMeta2 = new IndexFileMeta(50, 100, 42, new
FileProperties(1, 1));
+ groupMeta.onIndexCompacted(new FileProperties(1), compactedMeta2);
+
+ assertThat(groupMeta.indexMeta(1), is(meta1));
+ assertThat(groupMeta.indexMeta(50), is(compactedMeta2));
+ assertThat(groupMeta.indexMeta(100), is(meta3));
+ }
+
+ @Test
+ void testOnIndexRemoved() {
+ var meta1 = new IndexFileMeta(1, 50, 0, new FileProperties(0));
+ var meta2 = new IndexFileMeta(50, 100, 42, new FileProperties(1));
+ var meta3 = new IndexFileMeta(100, 150, 84, new FileProperties(2));
+
+ var groupMeta = new GroupIndexMeta(meta1);
+ groupMeta.addIndexMeta(meta2);
+ groupMeta.addIndexMeta(meta3);
+
+ groupMeta.onIndexRemoved(new FileProperties(1));
+
+ assertThat(groupMeta.indexMeta(1), is(meta1));
+ assertThat(groupMeta.indexMeta(50), is(nullValue()));
+ assertThat(groupMeta.indexMeta(99), is(nullValue()));
Review Comment:
This is a strange test, why is it possible to read `1` but not `50`? `1`
should also be impossible then, there can be no gaps in raft log
##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
+import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import
org.apache.ignite.internal.raft.storage.segstore.GroupInfoProvider.GroupInfo;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for {@link RaftLogGarbageCollector}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(MockitoExtension.class)
+class RaftLogGarbageCollectorTest extends IgniteAbstractTest {
+ private static final int FILE_SIZE = 200;
+
+ private static final long GROUP_ID_1 = 1000;
+
+ private static final long GROUP_ID_2 = 2000;
+
+ private static final int STRIPES = 10;
+
+ private static final String NODE_NAME = "test";
+
+ @InjectConfiguration
+ private RaftConfiguration raftConfiguration;
+
+ @InjectConfiguration("mock.segmentFileSizeBytes=" + FILE_SIZE)
+ private LogStorageConfiguration storageConfiguration;
+
+ private SegmentFileManager fileManager;
+
+ private IndexFileManager indexFileManager;
+
+ private RaftLogGarbageCollector garbageCollector;
+
+ @Mock
+ private GroupInfoProvider groupInfoProvider;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ fileManager = new SegmentFileManager(
+ NODE_NAME,
+ workDir,
+ STRIPES,
+ new NoOpFailureManager(),
+ groupInfoProvider,
+ raftConfiguration,
+ storageConfiguration
+ );
+
+ fileManager.start();
+
+ indexFileManager = fileManager.indexFileManager();
+
+ garbageCollector = fileManager.garbageCollector();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (fileManager != null) {
+ fileManager.close();
+ }
+ }
+
+ @Test
+ void testCompactSegmentFileWithAllEntriesTruncated() throws Exception {
+ List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(GROUP_ID_1, batches.get(i), i);
+ }
+
+ await().until(this::indexFiles, hasSize(greaterThan(0)));
+
+ // This is equivalent to prefix truncation up to the latest index.
+ when(groupInfoProvider.groupInfo(GROUP_ID_1)).thenReturn(new
GroupInfo(batches.size() - 1, batches.size() - 1));
+
+ List<Path> segmentFiles = segmentFiles();
+
+ Path segmentFilePath = segmentFiles.get(0);
+
+ FileProperties fileProperties =
SegmentFile.fileProperties(segmentFilePath);
+
+ SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath,
false);
Review Comment:
Is it possible to use `try-with-resources`?
##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMetaTest.java:
##########
@@ -277,4 +279,145 @@ void
testTruncatePrefixRemovesAllEntriesWhenKeptAfterLast() {
assertThat(groupMeta.indexMeta(19), is(nullValue()));
assertThat(groupMeta.firstLogIndexInclusive(), is(-1L));
}
+
+ @Test
+ void testOnIndexCompacted() {
+ var meta1 = new IndexFileMeta(1, 50, 0, new FileProperties(0));
+ var meta2 = new IndexFileMeta(50, 100, 42, new FileProperties(1));
+ var meta3 = new IndexFileMeta(100, 150, 84, new FileProperties(2));
+
+ var groupMeta = new GroupIndexMeta(meta1);
+ groupMeta.addIndexMeta(meta2);
+ groupMeta.addIndexMeta(meta3);
+
+ var compactedMeta2 = new IndexFileMeta(50, 100, 42, new
FileProperties(1, 1));
+ groupMeta.onIndexCompacted(new FileProperties(1), compactedMeta2);
+
+ assertThat(groupMeta.indexMeta(1), is(meta1));
+ assertThat(groupMeta.indexMeta(50), is(compactedMeta2));
+ assertThat(groupMeta.indexMeta(100), is(meta3));
+ }
+
+ @Test
+ void testOnIndexRemoved() {
+ var meta1 = new IndexFileMeta(1, 50, 0, new FileProperties(0));
+ var meta2 = new IndexFileMeta(50, 100, 42, new FileProperties(1));
+ var meta3 = new IndexFileMeta(100, 150, 84, new FileProperties(2));
+
+ var groupMeta = new GroupIndexMeta(meta1);
+ groupMeta.addIndexMeta(meta2);
+ groupMeta.addIndexMeta(meta3);
+
+ groupMeta.onIndexRemoved(new FileProperties(1));
+
+ assertThat(groupMeta.indexMeta(1), is(meta1));
+ assertThat(groupMeta.indexMeta(50), is(nullValue()));
+ assertThat(groupMeta.indexMeta(99), is(nullValue()));
+ assertThat(groupMeta.indexMeta(100), is(meta3));
+ assertThat(groupMeta.lastLogIndexExclusive(), is(150L));
+ }
+
+ @Test
+ void testOnIndexCompactedWithMultipleBlocks() {
+ // meta1 is in block 0.
+ var meta1 = new IndexFileMeta(1, 100, 0, new FileProperties(0));
+ var groupMeta = new GroupIndexMeta(meta1);
+
+ // meta2 overlaps meta1, creating a second block in the deque.
+ var meta2 = new IndexFileMeta(42, 100, 42, new FileProperties(1));
+ groupMeta.addIndexMeta(meta2);
+
+ // meta3 is added to the second block (consecutive to meta2).
+ var meta3 = new IndexFileMeta(100, 200, 84, new FileProperties(2));
+ groupMeta.addIndexMeta(meta3);
+
+ // Compact meta1 from the older block.
+ var compactedMeta1 = new IndexFileMeta(1, 100, 0, new
FileProperties(0, 1));
+ groupMeta.onIndexCompacted(new FileProperties(0), compactedMeta1);
+
+ assertThat(groupMeta.indexMeta(1), is(compactedMeta1));
+ assertThat(groupMeta.indexMeta(42), is(meta2));
+ assertThat(groupMeta.indexMeta(100), is(meta3));
+
+ // Compact meta3 from the newer block.
+ var compactedMeta3 = new IndexFileMeta(100, 200, 84, new
FileProperties(2, 1));
+ groupMeta.onIndexCompacted(new FileProperties(2), compactedMeta3);
+
+ assertThat(groupMeta.indexMeta(1), is(compactedMeta1));
+ assertThat(groupMeta.indexMeta(42), is(meta2));
+ assertThat(groupMeta.indexMeta(100), is(compactedMeta3));
+ }
+
+ @Test
+ void testOnIndexRemovedWithMultipleBlocks() {
+ var meta1 = new IndexFileMeta(1, 50, 0, new FileProperties(0));
+ var meta1b = new IndexFileMeta(50, 100, 42, new FileProperties(1));
+
+ var groupMeta = new GroupIndexMeta(meta1);
+ groupMeta.addIndexMeta(meta1b);
+
+ var meta2 = new IndexFileMeta(42, 100, 42, new FileProperties(2));
+ groupMeta.addIndexMeta(meta2);
+
+ var meta3 = new IndexFileMeta(100, 150, 84, new FileProperties(3));
+ groupMeta.addIndexMeta(meta3);
+
+ groupMeta.onIndexRemoved(new FileProperties(2));
+
+ assertThat(groupMeta.indexMeta(42), is(meta1));
Review Comment:
Now it's getting difficult. I would appreciate descriptions in javadocs.
How is it even possible to remove an in-between meta? This is illegal, at
least with an index that had some data. I clearly don't understand something.
Currently it doesn't look like "testing the contract", it looks more like
"testing side-effects of the implementation", which seems strange. I hop you
understand where I come from with this
##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogGarbageCollectorTest.java:
##########
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.segstore;
+
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.failure.NoOpFailureManager;
+import org.apache.ignite.internal.lang.RunnableX;
+import org.apache.ignite.internal.raft.configuration.LogStorageConfiguration;
+import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import
org.apache.ignite.internal.raft.storage.segstore.GroupInfoProvider.GroupInfo;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests for {@link RaftLogGarbageCollector}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+@ExtendWith(MockitoExtension.class)
+class RaftLogGarbageCollectorTest extends IgniteAbstractTest {
+ private static final int FILE_SIZE = 200;
+
+ private static final long GROUP_ID_1 = 1000;
+
+ private static final long GROUP_ID_2 = 2000;
+
+ private static final int STRIPES = 10;
+
+ private static final String NODE_NAME = "test";
+
+ @InjectConfiguration
+ private RaftConfiguration raftConfiguration;
+
+ @InjectConfiguration("mock.segmentFileSizeBytes=" + FILE_SIZE)
+ private LogStorageConfiguration storageConfiguration;
+
+ private SegmentFileManager fileManager;
+
+ private IndexFileManager indexFileManager;
+
+ private RaftLogGarbageCollector garbageCollector;
+
+ @Mock
+ private GroupInfoProvider groupInfoProvider;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ fileManager = new SegmentFileManager(
+ NODE_NAME,
+ workDir,
+ STRIPES,
+ new NoOpFailureManager(),
+ groupInfoProvider,
+ raftConfiguration,
+ storageConfiguration
+ );
+
+ fileManager.start();
+
+ indexFileManager = fileManager.indexFileManager();
+
+ garbageCollector = fileManager.garbageCollector();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (fileManager != null) {
+ fileManager.close();
+ }
+ }
+
+ @Test
+ void testCompactSegmentFileWithAllEntriesTruncated() throws Exception {
+ List<byte[]> batches = createRandomData(FILE_SIZE / 4, 10);
+ for (int i = 0; i < batches.size(); i++) {
+ appendBytes(GROUP_ID_1, batches.get(i), i);
+ }
+
+ await().until(this::indexFiles, hasSize(greaterThan(0)));
+
+ // This is equivalent to prefix truncation up to the latest index.
+ when(groupInfoProvider.groupInfo(GROUP_ID_1)).thenReturn(new
GroupInfo(batches.size() - 1, batches.size() - 1));
+
+ List<Path> segmentFiles = segmentFiles();
+
+ Path segmentFilePath = segmentFiles.get(0);
+
+ FileProperties fileProperties =
SegmentFile.fileProperties(segmentFilePath);
+
+ SegmentFile segmentFile = SegmentFile.openExisting(segmentFilePath,
false);
+ try {
+ garbageCollector.compactSegmentFile(segmentFile);
+ } finally {
+ segmentFile.close();
+ }
+
+ assertThat(Files.exists(segmentFilePath), is(false));
Review Comment:
There's a `org.hamcrest.io.FileMatchers` but it doesn't support `Path`
natively, you'd have to write `toFile()`. Up to you, whatever will look better
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]