ibessonov commented on code in PR #6753:
URL: https://github.com/apache/ignite-3/pull/6753#discussion_r2439482687


##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java:
##########
@@ -64,6 +64,22 @@ public SegmentInfo segmentInfo(long groupId) {
         return stripe(groupId).memTable.get(groupId);
     }
 
+    @Override
+    public void truncateSuffix(long groupId, long lastLogIndexKept) {
+        ConcurrentMap<Long, SegmentInfo> memtable = stripe(groupId).memTable;
+
+        SegmentInfo segmentInfo = memtable.get(groupId);
+
+        if (segmentInfo == null || lastLogIndexKept < 
segmentInfo.firstLogIndexInclusive()) {
+            // If the current memtable does not have information for the given 
group or if we are truncating everything currently present
+            // in the memtable, we need to write a special "empty" SegmentInfo 
into the memtable to override existing persisted data during
+            // search.
+            memtable.put(groupId, new SegmentInfo(lastLogIndexKept + 1));

Review Comment:
   I suppose there's another implicit reason to do so: 
`SegmentInfo#logIndexBase` is final, the "head" of the array is immutable. If 
you change it you must create a new array. Worth mentioning I suppose



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexMemTable.java:
##########
@@ -64,6 +64,22 @@ public SegmentInfo segmentInfo(long groupId) {
         return stripe(groupId).memTable.get(groupId);
     }
 
+    @Override
+    public void truncateSuffix(long groupId, long lastLogIndexKept) {
+        ConcurrentMap<Long, SegmentInfo> memtable = stripe(groupId).memTable;

Review Comment:
   How often do we need to resolve the `groupId` into the stripe? Just a 
general question. If this method is already called by a log manager of that 
specific group then this is a waste of CPU time. If this is the case, then how 
hard would it be to improve the situation in a separate PR?
   
   EDIT: applies to the next line as well, of course. Concurrent map lookup, 
and an implicit `long` value boxing



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java:
##########
@@ -73,7 +73,15 @@ int size() {
     }
 
     long firstLogIndexInclusive() {
-        return array[0].firstLogIndexInclusive();
+        for (int i = 0; i < size; i++) {

Review Comment:
   I believe that here we need the same comment. Maybe the javadoc of this 
method should mention what exactly it returns in various cases



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java:
##########
@@ -56,6 +58,17 @@ ArrayWithSize add(int element) {
             return new ArrayWithSize(array, size + 1);
         }
 
+        ArrayWithSize truncate(int newSize) {
+            assert newSize <= size
+                    : String.format("Array must shrink on truncation, current 
size: %d, size after truncation: %d", size, newSize);
+
+            int[] newArray = new int[size];
+
+            System.arraycopy(array, 0, newArray, 0, newSize);

Review Comment:
   Please consider using `java.util.Arrays#copyOfRange(int[], int, int)`



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java:
##########
@@ -137,7 +137,8 @@ long firstLogIndexInclusive(long groupId) {
         while (it.hasNext()) {
             SegmentInfo segmentInfo = 
it.next().memTable().segmentInfo(groupId);

Review Comment:
   I believe I'm asking the same question second time here. Is it possible that 
a constant need to resolve `groupId` into various structures will affect the 
performance?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java:
##########
@@ -159,4 +172,23 @@ void saveOffsetsTo(ByteBuffer buffer) {
 
         buffer.asIntBuffer().put(offsets.array, 0, offsets.size);
     }
+
+    void truncateSuffix(long lastLogIndexKept) {
+        ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
+
+        long newSize = max(lastLogIndexKept - logIndexBase + 1, 0);
+
+        if (newSize >= segmentFileOffsets.size()) {
+            // Nothing to truncate.
+            return;
+        }
+
+        ArrayWithSize newSegmentFileOffsets = 
segmentFileOffsets.truncate((int) newSize);

Review Comment:
   Does that imply that literally the next write into this array will have to 
expand it right after the allocation?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/RaftLogCheckpointer.java:
##########
@@ -137,7 +137,8 @@ long firstLogIndexInclusive(long groupId) {
         while (it.hasNext()) {
             SegmentInfo segmentInfo = 
it.next().memTable().segmentInfo(groupId);

Review Comment:
   I wonder why you iterate over multiple memtables in this method. I thought 
that a single checkpoint only corresponds to a single memtable, is this 
assumption false?



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java:
##########
@@ -159,4 +172,23 @@ void saveOffsetsTo(ByteBuffer buffer) {
 
         buffer.asIntBuffer().put(offsets.array, 0, offsets.size);
     }
+
+    void truncateSuffix(long lastLogIndexKept) {
+        ArrayWithSize segmentFileOffsets = this.segmentFileOffsets;
+
+        long newSize = max(lastLogIndexKept - logIndexBase + 1, 0);
+
+        if (newSize >= segmentFileOffsets.size()) {

Review Comment:
   How can it be larger? I'm a little bit confused



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IndexFileMetaArray.java:
##########
@@ -73,7 +73,15 @@ int size() {
     }
 
     long firstLogIndexInclusive() {
-        return array[0].firstLogIndexInclusive();
+        for (int i = 0; i < size; i++) {

Review Comment:
   I also have an optimization for you.
   If `array.first.firstLogIndexInclusive == array.last.lastLogIndexExclusive` 
then you return `-1`.
   Otherwise you can return `array.first.firstLogIndexInclusive`, without a 
loop. Requires only two lookups



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java:
##########
@@ -61,6 +70,21 @@ static void writeTo(
         buffer.putInt(crc);
     }
 
+    static void writeTruncateSuffixRecordTo(ByteBuffer buffer, long groupId, 
long lastLogIndexKept) {
+        int originalPos = buffer.position();
+
+        buffer
+                .putLong(groupId)

Review Comment:
   I struggle to fund a place in the code where you set explicit `order` to 
this buffer.  There's one in `SegmentFile#buffer()` but you don't use it here. 
Could you please point me to the right place? Thank you!



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java:
##########
@@ -165,32 +170,23 @@ private static SegmentFileWithMemtable 
convertToReadOnly(SegmentFileWithMemtable
     void appendEntry(long groupId, LogEntry entry, LogEntryEncoder encoder) 
throws IOException {
         int entrySize = encoder.size(entry);
 
-        if (entrySize > maxEntrySize()) {
+        if (entrySize > maxPossibleEntrySize()) {
             throw new IllegalArgumentException(String.format(
-                    "Entry size is too big (%d bytes), maximum allowed entry 
size: %d bytes.", entrySize, maxEntrySize()
+                    "Entry size is too big (%d bytes), maximum allowed entry 
size: %d bytes.", entrySize, maxPossibleEntrySize()
             ));
         }
 
         int payloadSize = SegmentPayload.size(entrySize);
 
-        while (true) {
-            SegmentFileWithMemtable segmentFileWithMemtable = 
currentSegmentFile();
-
-            try (WriteBuffer writeBuffer = 
segmentFileWithMemtable.segmentFile().reserve(payloadSize)) {
-                if (writeBuffer != null) {
-                    int segmentOffset = writeBuffer.buffer().position();
+        try (WriteBufferWithMemtable writeBufferWithMemtable = 
reserveBytesWithRollover(payloadSize)) {
+            ByteBuffer segmentBuffer = writeBufferWithMemtable.buffer();
 
-                    SegmentPayload.writeTo(writeBuffer.buffer(), groupId, 
entrySize, entry, encoder);
+            int segmentOffset = segmentBuffer.position();
 
-                    // Append to memtable before write buffer is released to 
avoid races with checkpoint on rollover.
-                    
segmentFileWithMemtable.memtable().appendSegmentFileOffset(groupId, 
entry.getId().getIndex(), segmentOffset);
-
-                    return;
-                }
-            }
+            SegmentPayload.writeTo(segmentBuffer, groupId, entrySize, entry, 
encoder);
 
-            // Segment file does not have enough space. Try to switch to a new 
one and retry the write attempt.
-            initiateRollover(segmentFileWithMemtable);
+            // Append to memtable before write buffer is released to avoid 
races with checkpoint on rollover.
+            writeBufferWithMemtable.memtable.appendSegmentFileOffset(groupId, 
entry.getId().getIndex(), segmentOffset);

Review Comment:
   A question. Do we wait for preceding writes from other threads to finish 
inside of the `writeTo` call? If that's the case, would it be more optimal to 
do so after the append to mem-table?
   
   All I ask is to entertain this thought, no actions required I guess



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/GroupIndexMeta.java:
##########
@@ -19,57 +19,117 @@
 
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Represents in-memory meta information about a particular Raft group stored 
in an index file.
  */
 class GroupIndexMeta {
-    private static final VarHandle FILE_METAS_VH;
+    private static class IndexMetaArrayHolder {
+        private static final VarHandle FILE_METAS_VH;
 
-    static {
-        try {
-            FILE_METAS_VH = 
MethodHandles.lookup().findVarHandle(GroupIndexMeta.class, "fileMetas", 
IndexFileMetaArray.class);
-        } catch (ReflectiveOperationException e) {
-            throw new ExceptionInInitializerError(e);
+        static {
+            try {
+                FILE_METAS_VH = 
MethodHandles.lookup().findVarHandle(IndexMetaArrayHolder.class, "fileMetas", 
IndexFileMetaArray.class);
+            } catch (ReflectiveOperationException e) {
+                throw new ExceptionInInitializerError(e);
+            }
+        }
+
+        @SuppressWarnings("FieldMayBeFinal") // Updated through a VarHandle.
+        volatile IndexFileMetaArray fileMetas;
+
+        IndexMetaArrayHolder(IndexFileMeta startFileMeta) {
+            this.fileMetas = new IndexFileMetaArray(startFileMeta);
+        }
+
+        void addIndexMeta(IndexFileMeta indexFileMeta) {
+            IndexFileMetaArray fileMetas = this.fileMetas;
+
+            IndexFileMetaArray newFileMetas = fileMetas.add(indexFileMeta);
+
+            // Simple assignment would suffice, since we only have one thread 
writing to this field, but we use compareAndSet to verify
+            // this invariant, just in case.
+            boolean updated = FILE_METAS_VH.compareAndSet(this, fileMetas, 
newFileMetas);
+
+            assert updated : "Concurrent writes detected";
         }
     }
 
-    @SuppressWarnings("FieldMayBeFinal") // Updated through a VarHandle.
-    private volatile IndexFileMetaArray fileMetas;
+    private final Deque<IndexMetaArrayHolder> fileMetaDeque = new 
ConcurrentLinkedDeque<>();
 
     GroupIndexMeta(IndexFileMeta startFileMeta) {
-        this.fileMetas = new IndexFileMetaArray(startFileMeta);
+        fileMetaDeque.add(new IndexMetaArrayHolder(startFileMeta));
     }
 
     void addIndexMeta(IndexFileMeta indexFileMeta) {
-        IndexFileMetaArray fileMetas = this.fileMetas;
+        IndexMetaArrayHolder curFileMetas = fileMetaDeque.getLast();
+
+        long curLastLogIndex = curFileMetas.fileMetas.lastLogIndexExclusive();
 
-        IndexFileMetaArray newFileMetas = fileMetas.add(indexFileMeta);
+        long newFirstLogIndex = indexFileMeta.firstLogIndexInclusive();
 
-        // Simple assignment would suffice, since we only have one thread 
writing to this field, but we use compareAndSet to verify
-        // this invariant, just in case.
-        boolean updated = FILE_METAS_VH.compareAndSet(this, fileMetas, 
newFileMetas);
+        assert newFirstLogIndex <= curLastLogIndex :
+                String.format(
+                        "Gaps between Index File Metas are not allowed. Last 
log index: %d, new log index: %d",
+                        curLastLogIndex, newFirstLogIndex
+                );
 
-        assert updated : "Concurrent writes detected";
+        // Merge consecutive index metas into a single meta block. If there's 
an overlap (e.g. due to log truncation), start a new block,
+        // which will override the previous one during search.
+        if (curLastLogIndex == newFirstLogIndex) {
+            curFileMetas.addIndexMeta(indexFileMeta);
+        } else {
+            fileMetaDeque.add(new IndexMetaArrayHolder(indexFileMeta));
+        }
     }
 
     /**
-     * Returns a file pointer that uniquely identifies the index file for the 
given log index. Returns {@code null} if the given log index
+     * Returns index file meta that uniquely identifies the index file for the 
given log index. Returns {@code null} if the given log index
      * is not found in any of the index files in this group.
      */
     @Nullable
     IndexFileMeta indexMeta(long logIndex) {
-        return fileMetas.find(logIndex);
+        Iterator<IndexMetaArrayHolder> it = fileMetaDeque.descendingIterator();
+
+        while (it.hasNext()) {
+            IndexFileMetaArray fileMetas = it.next().fileMetas;
+
+            // Log suffix might have been truncated, so we can have an entry 
on the top of the queue that cuts off part of the search range.
+            if (logIndex >= fileMetas.lastLogIndexExclusive()) {
+                return null;
+            }
+
+            if (logIndex < fileMetas.firstLogIndexInclusive()) {
+                continue;
+            }
+
+            IndexFileMeta indexMeta = fileMetas.find(logIndex);
+
+            if (indexMeta != null) {
+                return indexMeta;
+            }
+        }
+
+        return null;
     }
 
     long firstLogIndexInclusive() {
-        return fileMetas.get(0).firstLogIndexInclusive();
+        for (IndexMetaArrayHolder indexMetaArrayHolder : fileMetaDeque) {
+            IndexFileMetaArray fileMetas = indexMetaArrayHolder.fileMetas;
+
+            if (fileMetas.size() > 0) {
+                return fileMetas.firstLogIndexInclusive();
+            }

Review Comment:
   I believe that this condition is only needed to differentiate between an 
empty log and a non-empty log. Is that true? If the answer is yes then could 
you please add a small clarifying comment to the code? If the answer is no then 
maybe it requires a larger comment. Thank you!
   
   EDIT: Now that I think about it, I don't really understand what exactly does 
the `size` method return. Size of the array? How can it be zero?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to