Copilot commented on code in PR #6753:
URL: https://github.com/apache/ignite-3/pull/6753#discussion_r2439055179


##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentPayload.java:
##########
@@ -61,6 +70,21 @@ static void writeTo(
         buffer.putInt(crc);
     }
 
+    static void writeTruncateSuffixRecordTo(ByteBuffer buffer, long groupId, 
long lastLogIndexKept) {
+        int originalPos = buffer.position();
+
+        buffer
+                .putLong(groupId)
+                .putInt(TRUNCATE_SUFFIX_RECORD_MARKER)
+                .putLong(lastLogIndexKept);
+
+        buffer.position(originalPos);
+
+        int crc = FastCrc.calcCrc(buffer, TRUNCATE_SUFFIX_RECORD_SIZE - 
HASH_SIZE);
+
+        buffer.putInt(crc);
+    }

Review Comment:
   CRC is written at the wrong position (overwriting the initial bytes) and the 
buffer position is not advanced to the end of the reserved record. This 
corrupts the record layout and can leave the write pointer inside the reserved 
region. To fix: do not reset to originalPos before writing CRC; compute CRC 
after writing the fields (mirroring writeTo) and then putInt at the end 
(current position). If you must reposition to compute CRC, reposition back to 
the end before buffer.putInt(crc).



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java:
##########
@@ -63,13 +64,17 @@
  * 
+---------------+---------+--------------------------+---------+----------------+
  * </pre>
  *
+ * <p>In addition to regular Raft log entries, payload can also represent a 
special type of entry which are written when Raft suffix
+ * is truncated. Such entries are identified by having a payload length of 0, 
followed by 8 bytes of the last log index kept after the
+ * truncation.
+ *
  * <p>When a rollover happens and the segment file being replaced has at least 
8 bytes left, a special {@link #SWITCH_SEGMENT_RECORD} is
  * written at the end of the file. If there are less than 8 bytes left, no 
switch records are written.
  */
 class SegmentFileManager implements ManuallyCloseable {
     private static final int ROLLOVER_WAIT_TIMEOUT_MS = 30_000;
 
-    private static final int MAGIC_NUMBER = 0xFEEDFACE;
+    private static final int MAGIC_NUMBER = 0x56E0B526;

Review Comment:
   Changing the on-disk magic number is a breaking change for existing segment 
files and will prevent opening previously written data without a migration 
path. Please revert to the original magic or bump FORMAT_VERSION and implement 
a read path that accepts the prior magic to maintain backward compatibility.



##########
modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManagerGetEntryTest.java:
##########
@@ -151,67 +163,183 @@ void getEntryMultithreadedTest() throws IOException {
         runRace(tasks.toArray(RunnableX[]::new));
 
         // Validate that data was actually inserted.
-        for (Map.Entry<Long, List<MockEntry>> e : entriesByGroupId.entrySet()) 
{
+        for (Map.Entry<Long, List<LogEntry>> e : entriesByGroupId.entrySet()) {
             long groupId = e.getKey();
 
-            for (MockEntry entry : e.getValue()) {
-                LogEntry actualEntry = fileManager.getEntry(groupId, 
entry.logIndex(), entry.decoder);
+            for (LogEntry entry : e.getValue()) {
+                LogEntry actualEntry = fileManager.getEntry(groupId, 
entry.getId().getIndex(), decoder);
 
-                assertThat(actualEntry, is(entry.logEntry));
+                assertThat(actualEntry, is(entry));
             }
         }
     }
 
-    private static Map<Long, List<MockEntry>> generateEntries(int numGroups, 
int numEntriesPerGroup, int entrySize) {
-        Map<Long, List<MockEntry>> entriesByGroupId = newHashMap(numGroups);
+    @Test
+    void testGetEntryWithSuffixTruncate() throws IOException {
+        int entrySize = FILE_SIZE / 10;
+
+        int numEntries = 100;
+
+        long curLogIndex = 0;
+
+        for (int i = 0; i < numEntries; i++) {
+            var entry = createLogEntry(curLogIndex, entrySize);
+
+            fileManager.appendEntry(0, entry, encoder);
+
+            if (i > 0 && i % 10 == 0) {
+                curLogIndex -= 4;
+
+                fileManager.truncateSuffix(0, curLogIndex);
+
+                // Check that the "lastIndexKept" entry is accessible, while 
the truncated one is not.
+                assertThat(fileManager.getEntry(0, curLogIndex, decoder), 
is(notNullValue()));
+
+                assertThat(fileManager.getEntry(0, curLogIndex + 1, decoder), 
is(nullValue()));
+            }
+
+            curLogIndex++;
+        }
+    }
+
+    @RepeatedTest(5)
+    void truncateSuffixMultithreadedTest() throws IOException {
+        int numGroups = STRIPES;
+
+        int entrySize = 10;
+
+        int numEntriesPerGroup = 100;
+
+        Map<Long, List<LogEntry>> entriesByGroupId = 
generateEntries(numGroups, numEntriesPerGroup, entrySize);
+
+        // Entries that will be used to replace truncated entries.
+        var replacementEntriesByGroupId = new HashMap<Long, List<LogEntry>>();
+
+        int numReplacementEntriesPerGroup = numEntriesPerGroup / 5;
 
         for (long groupId = 0; groupId < numGroups; groupId++) {
-            var entries = new ArrayList<MockEntry>(numEntriesPerGroup);
+            var entries = new 
ArrayList<LogEntry>(numReplacementEntriesPerGroup);
 
-            for (int i = 0; i < numEntriesPerGroup; i++) {
-                entries.add(new MockEntry(i, entrySize));
+            for (int i = 0; i < numReplacementEntriesPerGroup; i++) {
+                entries.add(createLogEntry(i * 5, entrySize));
             }
 
-            entriesByGroupId.put(groupId, entries);
+            replacementEntriesByGroupId.put(groupId, entries);
         }
 
-        return entriesByGroupId;
-    }
+        var tasks = new ArrayList<RunnableX>();
 
-    private static class MockEntry {
-        private final LogEntry logEntry = mock(LogEntry.class);
+        for (int i = 0; i < numGroups; i++) {
+            long groupId = i;
 
-        private final LogEntryEncoder encoder;
+            List<LogEntry> entries = entriesByGroupId.get(groupId);
 
-        private final LogEntryDecoder decoder;
+            List<LogEntry> replacementEntries = 
replacementEntriesByGroupId.get(groupId);
 
-        MockEntry(long logIndex, int entrySize) {
-            when(logEntry.getId()).thenReturn(new LogId(logIndex, 0));
+            tasks.add(() -> {
+                for (int entryIndex = 0; entryIndex < entries.size(); 
entryIndex++) {
+                    LogEntry entry = entries.get(entryIndex);
 
-            byte[] bytes = randomBytes(ThreadLocalRandom.current(), entrySize);
+                    fileManager.appendEntry(groupId, entry, encoder);
 
-            encoder = new LogEntryEncoder() {
-                @Override
-                public byte[] encode(LogEntry log) {
-                    throw new UnsupportedOperationException();
-                }
+                    // Truncate every 5th entry.
+                    if (entryIndex % 5 == 0) {
+                        fileManager.truncateSuffix(groupId, entryIndex - 1);
 
-                @Override
-                public void encode(ByteBuffer buffer, LogEntry log) {
-                    buffer.put(bytes);
+                        LogEntry replacementEntry = 
replacementEntries.get(entryIndex / 5);
+
+                        fileManager.appendEntry(groupId, replacementEntry, 
encoder);
+                    }
                 }
+            });
+        }
+
+        for (int i = 0; i < numGroups; i++) {
+            long groupId = i;
+
+            List<LogEntry> entries = entriesByGroupId.get(groupId);
 
-                @Override
-                public int size(LogEntry logEntry) {
-                    return entrySize;
+            List<LogEntry> replacementEntries = 
replacementEntriesByGroupId.get(groupId);
+
+            RunnableX reader = () -> {
+                for (int logIndex = 0; logIndex < entries.size(); logIndex++) {
+                    LogEntry actualEntry = fileManager.getEntry(groupId, 
logIndex, decoder);
+
+                    if (actualEntry == null) {
+                        continue;
+                    }
+
+                    LogEntry expectedEntry = entries.get(logIndex);
+
+                    if (logIndex % 5 == 0) {
+                        // Here we can read both the truncated and the 
replacement entry.
+                        LogEntry replacementEntry = 
replacementEntries.get(logIndex / 5);
+
+                        assertThat(actualEntry, 
either(sameInstance(replacementEntry)).or(sameInstance(expectedEntry)));
+                    } else {
+                        assertThat(actualEntry, 
is(sameInstance(expectedEntry)));
+                    }
                 }
             };
 
-            decoder = bs -> logEntry;
+            // Two readers per every group.
+            tasks.add(reader);
+            tasks.add(reader);
+        }
+
+        runRace(tasks.toArray(RunnableX[]::new));
+
+        // Validate that data was actually inserted.
+        for (long groupId = 0; groupId < numGroups; groupId++) {
+            List<LogEntry> entries = entriesByGroupId.get(groupId);
+
+            List<LogEntry> replacementEntries = 
replacementEntriesByGroupId.get(groupId);
+
+            for (int logIndex = 0; logIndex < entries.size(); logIndex++) {
+                LogEntry expectedEntry = logIndex % 5 == 0 ? 
replacementEntries.get(logIndex / 5) : entries.get(logIndex);
+
+                LogEntry actualEntry = fileManager.getEntry(groupId, logIndex, 
decoder);
+
+                assertThat(actualEntry, is(sameInstance(expectedEntry)));
+            }
         }
+    }
+
+    private Map<Long, List<LogEntry>> generateEntries(int numGroups, int 
numEntriesPerGroup, int entrySize) {
+        Map<Long, List<LogEntry>> entriesByGroupId = newHashMap(numGroups);
+
+        for (long groupId = 0; groupId < numGroups; groupId++) {
+            var entries = new ArrayList<LogEntry>(numEntriesPerGroup);
+
+            for (int i = 0; i < numEntriesPerGroup; i++) {
+                entries.add(createLogEntry(i, entrySize));
+            }
 
-        long logIndex() {
-            return logEntry.getId().getIndex();
+            entriesByGroupId.put(groupId, entries);
         }
+
+        return entriesByGroupId;
+    }
+
+    private LogEntry createLogEntry(long logIndex, int entrySize) {
+        LogEntry logEntry = new LogEntry();
+
+        logEntry.setId(new LogId(logIndex, 0));
+
+        byte[] bytes = randomBytes(ThreadLocalRandom.current(), entrySize);
+
+        lenient().doAnswer(invocationOnMock -> {
+            ByteBuffer buffer = invocationOnMock.getArgument(0);
+
+            buffer.put(bytes);
+
+            return null;
+        }).when(encoder).encode(any(), same(logEntry));
+
+        lenient().when(encoder.size(same(logEntry))).thenReturn(entrySize);
+
+        lenient().when(decoder.decode(bytes)).thenReturn(logEntry);
+
+        return logEntry;

Review Comment:
   Stubbing decoder.decode(bytes) matches only by the byte[] reference; 
SegmentPayload.readFrom allocates a new array, so the stub won't match and 
getEntry will decode to null. Use content-based matching, for example: 
when(decoder.decode(argThat(arr -> Arrays.equals(arr, 
bytes)))).thenReturn(logEntry); Add the necessary imports for 
ArgumentMatchers.argThat and java.util.Arrays.



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentInfo.java:
##########
@@ -56,6 +58,17 @@ ArrayWithSize add(int element) {
             return new ArrayWithSize(array, size + 1);
         }
 
+        ArrayWithSize truncate(int newSize) {
+            assert newSize <= size
+                    : String.format("Array must shrink on truncation, current 
size: %d, size after truncation: %d", size, newSize);
+
+            int[] newArray = new int[size];
+
+            System.arraycopy(array, 0, newArray, 0, newSize);
+
+            return new ArrayWithSize(newArray, newSize);
+        }

Review Comment:
   The new array is allocated with length size (the old capacity) instead of 
newSize, which wastes memory after truncation. Allocate exactly newSize: int[] 
newArray = new int[newSize];



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to