rpuch commented on code in PR #6577: URL: https://github.com/apache/ignite-3/pull/6577#discussion_r2341010532
########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IgniteLogStorage.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.storage.segstore; + +import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; + +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer; +import org.apache.ignite.internal.util.FastCrc; +import org.apache.ignite.raft.jraft.entity.LogEntry; +import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder; +import org.apache.ignite.raft.jraft.option.LogStorageOptions; +import org.apache.ignite.raft.jraft.storage.LogStorage; + +/** + * Ignite's {@link LogStorage} implementation. + * + * <p>Every storage instance is associated with a single Raft group, but multiple storage instances can share the same + * {@link SegmentFileManager} instance meaning that they can share the same segment files. + * + * <p>Every appended entry is converted into its serialized form (a.k.a. "payload"), defined by a {@link LogEntryEncoder}, + * and stored in a segment file. + * + * <p>Binary representation of each entry is as follows: + * <pre> + * +---------------+---------+--------------------------+---------+----------------+ + * | Raft Group ID (8 bytes) | Payload Length (4 bytes) | Payload | Hash (4 bytes) | + * +---------------+---------+--------------------------+---------+----------------+ + * </pre> + */ +class IgniteLogStorage implements LogStorage { + static final int GROUP_ID_SIZE_BYTES = Long.BYTES; + + static final int LENGTH_SIZE_BYTES = Integer.BYTES; + + static final int HASH_SIZE = Integer.BYTES; + + private final long groupId; + + private final SegmentFileManager segmentFileManager; + + private volatile LogEntryEncoder logEntryEncoder; + + IgniteLogStorage(long groupId, SegmentFileManager segmentFileManager) { + if (groupId <= 0) { + throw new IllegalArgumentException("groupId must be greater than 0: " + groupId); + } + + this.groupId = groupId; + this.segmentFileManager = segmentFileManager; + } + + @Override + public boolean init(LogStorageOptions opts) { + logEntryEncoder = opts.getLogEntryCodecFactory().encoder(); + + return true; + } + + @Override + public boolean appendEntry(LogEntry entry) { + byte[] bytes = logEntryEncoder.encode(entry); + + try (WriteBuffer writeBuffer = segmentFileManager.reserve(entrySize(bytes))) { + writeEntry(writeBuffer, bytes); + } catch (Exception e) { + throw new IgniteInternalException(INTERNAL_ERR, e); + } + + return true; + } + + private void writeEntry(WriteBuffer writeBuffer, byte[] payload) { + ByteBuffer buffer = writeBuffer.buffer(); + + int pos = buffer.position(); Review Comment: ```suggestion int originalPos = buffer.position(); ``` ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.storage.segstore; + +import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.internal.close.ManuallyCloseable; +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer; + +/** + * File manager responsible for allocating and maintaining a pointer to the current segment file. + * + * <p>When the current segment file becomes full, that is, it does not contain enough bytes left to satisfy a request by one of the writer + * threads, then a new segment file is allocated and is atomically switched to be the current one. This operation is called rollover. + * + * <p>Every segment file has the following structure: + * <pre> + * +------------------+---------+-----+---------+ + * | Header (8 bytes) | Payload | ... | Payload | + * +------------------+---------+-----+---------+ + * </pre> + * + * <p>Header structure is the following: + * <pre> + * +------------------------+-------------------+ + * | Magic number (4 bytes) | Version (4 bytes) | + * +------------------------+-------------------+ + * </pre> + * + * <p>Payload structure is defined by the outer callers. + * + * <p>When a rollover happens and the segment file being replaced has at least 8 bytes left, a special {@link #SWITCH_SEGMENT_RECORD} is + * written at the end of the file. If there are less than 8 bytes left, no switch records are written. + */ +class SegmentFileManager implements ManuallyCloseable { + private static final int WAIT_TIMEOUT_MS = 30_000; + + private static final int MAGIC_NUMBER = 0xFEEDFACE; + + private static final int FORMAT_VERSION = 1; + + static final byte[] HEADER_RECORD = ByteBuffer.allocate(Integer.BYTES + Integer.BYTES) + .order(SegmentFile.BYTE_ORDER) + .putInt(MAGIC_NUMBER) + .putInt(FORMAT_VERSION) + .array(); + + static final byte[] SWITCH_SEGMENT_RECORD = new byte[8]; Review Comment: Please add a javadoc clarifying that it's what it looks like: namely, 8 zero bytes ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.storage.segstore; + +import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.internal.close.ManuallyCloseable; +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer; + +/** + * File manager responsible for allocating and maintaining a pointer to the current segment file. + * + * <p>When the current segment file becomes full, that is, it does not contain enough bytes left to satisfy a request by one of the writer + * threads, then a new segment file is allocated and is atomically switched to be the current one. This operation is called rollover. + * + * <p>Every segment file has the following structure: + * <pre> + * +------------------+---------+-----+---------+ + * | Header (8 bytes) | Payload | ... | Payload | + * +------------------+---------+-----+---------+ + * </pre> + * + * <p>Header structure is the following: + * <pre> + * +------------------------+-------------------+ + * | Magic number (4 bytes) | Version (4 bytes) | + * +------------------------+-------------------+ + * </pre> + * + * <p>Payload structure is defined by the outer callers. + * + * <p>When a rollover happens and the segment file being replaced has at least 8 bytes left, a special {@link #SWITCH_SEGMENT_RECORD} is + * written at the end of the file. If there are less than 8 bytes left, no switch records are written. + */ +class SegmentFileManager implements ManuallyCloseable { + private static final int WAIT_TIMEOUT_MS = 30_000; Review Comment: ```suggestion private static final int ROLLOVER_WAIT_TIMEOUT_MS = 30_000; ``` ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.storage.segstore; + +import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.internal.close.ManuallyCloseable; +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer; + +/** + * File manager responsible for allocating and maintaining a pointer to the current segment file. + * + * <p>When the current segment file becomes full, that is, it does not contain enough bytes left to satisfy a request by one of the writer + * threads, then a new segment file is allocated and is atomically switched to be the current one. This operation is called rollover. + * + * <p>Every segment file has the following structure: + * <pre> + * +------------------+---------+-----+---------+ + * | Header (8 bytes) | Payload | ... | Payload | + * +------------------+---------+-----+---------+ + * </pre> + * + * <p>Header structure is the following: + * <pre> + * +------------------------+-------------------+ + * | Magic number (4 bytes) | Version (4 bytes) | + * +------------------------+-------------------+ + * </pre> + * + * <p>Payload structure is defined by the outer callers. + * + * <p>When a rollover happens and the segment file being replaced has at least 8 bytes left, a special {@link #SWITCH_SEGMENT_RECORD} is + * written at the end of the file. If there are less than 8 bytes left, no switch records are written. + */ +class SegmentFileManager implements ManuallyCloseable { + private static final int WAIT_TIMEOUT_MS = 30_000; + + private static final int MAGIC_NUMBER = 0xFEEDFACE; + + private static final int FORMAT_VERSION = 1; + + static final byte[] HEADER_RECORD = ByteBuffer.allocate(Integer.BYTES + Integer.BYTES) + .order(SegmentFile.BYTE_ORDER) + .putInt(MAGIC_NUMBER) + .putInt(FORMAT_VERSION) + .array(); + + static final byte[] SWITCH_SEGMENT_RECORD = new byte[8]; + + private final Path baseDir; + + /** Configured size of a segment file. */ + private final long fileSize; + + /** + * Current segment file. Can store {@code null} while a rollover is in progress. + */ + private final AtomicReference<SegmentFile> currentSegmentFile = new AtomicReference<>(); + + /** Lock used to block threads while a rollover is in progress. */ + private final Object rolloverLock = new Object(); + + /** + * Current segment file index (used to generate segment file names). + * + * <p>Must always be accessed under the {@link #rolloverLock}. + */ + private int curFileIndex; + + /** + * Flag indicating whether the file manager has been stopped. + * + * <p>Must always be accessed under the {@link #rolloverLock}. + */ + private boolean isStopped; + + SegmentFileManager(Path baseDir, long fileSize) { + this.baseDir = baseDir; + this.fileSize = fileSize; + } + + void start() throws IOException { + // TODO: implement recovery, see https://issues.apache.org/jira/browse/IGNITE-26283. + currentSegmentFile.set(allocateNewSegmentFile(0)); + } + + private SegmentFile allocateNewSegmentFile(int fileIndex) throws IOException { + Path path = baseDir.resolve(segmentFileName(fileIndex, 0)); + + var segmentFile = new SegmentFile(path, fileSize, 0); + + writeHeader(segmentFile); + + return segmentFile; + } + + private static String segmentFileName(int fileIndex, int generation) { + return String.format("segment-%010d-%010d.bin", fileIndex, generation); + } + + WriteBuffer reserve(int size) throws Exception { + if (size > maxEntrySize()) { + throw new IllegalArgumentException("Entry size is too big: " + size); + } + + while (true) { + SegmentFile segmentFile = currentSegmentFile(); Review Comment: This method may throw an `InterruptedException` at the caller, but it doesn't advertise this. Probably it should restore the interruption status if such an exception is thrown ########## modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/segstore/IgniteLogStorageTest.java: ########## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.storage.segstore; + +import static java.util.Collections.nCopies; +import static java.util.stream.Collectors.toList; +import static org.apache.ignite.internal.raft.storage.segstore.IgniteLogStorage.GROUP_ID_SIZE_BYTES; +import static org.apache.ignite.internal.raft.storage.segstore.IgniteLogStorage.HASH_SIZE; +import static org.apache.ignite.internal.raft.storage.segstore.IgniteLogStorage.LENGTH_SIZE_BYTES; +import static org.apache.ignite.internal.raft.storage.segstore.IgniteLogStorage.entrySize; +import static org.apache.ignite.internal.raft.storage.segstore.SegmentFileManager.SWITCH_SEGMENT_RECORD; +import static org.apache.ignite.internal.util.IgniteUtils.closeAllManually; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; +import org.apache.ignite.internal.testframework.IgniteAbstractTest; +import org.apache.ignite.internal.util.FastCrc; +import org.apache.ignite.raft.jraft.entity.LogEntry; +import org.apache.ignite.raft.jraft.entity.codec.LogEntryCodecFactory; +import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder; +import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder; +import org.apache.ignite.raft.jraft.option.LogStorageOptions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class IgniteLogStorageTest extends IgniteAbstractTest { + private static final int SEGMENT_SIZE = 1024; + + private static final long GROUP_ID = 1000; + + private IgniteLogStorage logStorage; + + private SegmentFileManager segmentFileManager; + + @Mock + private LogEntryEncoder encoder; + + @BeforeEach + void setUp() throws IOException { + segmentFileManager = new SegmentFileManager(workDir, SEGMENT_SIZE); + + logStorage = new IgniteLogStorage(GROUP_ID, segmentFileManager); + + var opts = new LogStorageOptions(); + + opts.setLogEntryCodecFactory(new LogEntryCodecFactory() { + @Override + public LogEntryEncoder encoder() { + return encoder; + } + + @Override + public LogEntryDecoder decoder() { + return null; + } + }); + + segmentFileManager.start(); + + logStorage.init(opts); + } + + @AfterEach + void tearDown() throws Exception { + closeAllManually(segmentFileManager, logStorage::shutdown); Review Comment: Should logStorage be closed first? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFile.java: ########## @@ -35,6 +36,11 @@ * <p>This implementation is thread-safe in terms of concurrent writes. */ class SegmentFile implements ManuallyCloseable { + /** + * Byte order of the buffers used by {@link WriteBuffer#buffer}. + */ + static final ByteOrder BYTE_ORDER = ByteOrder.nativeOrder(); Review Comment: Why do we need native order here? Are we going to write Raft logs differently, depending on the machine endianness? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.storage.segstore; + +import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.internal.close.ManuallyCloseable; +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer; + +/** + * File manager responsible for allocating and maintaining a pointer to the current segment file. + * + * <p>When the current segment file becomes full, that is, it does not contain enough bytes left to satisfy a request by one of the writer + * threads, then a new segment file is allocated and is atomically switched to be the current one. This operation is called rollover. + * + * <p>Every segment file has the following structure: + * <pre> + * +------------------+---------+-----+---------+ + * | Header (8 bytes) | Payload | ... | Payload | + * +------------------+---------+-----+---------+ + * </pre> + * + * <p>Header structure is the following: + * <pre> + * +------------------------+-------------------+ + * | Magic number (4 bytes) | Version (4 bytes) | + * +------------------------+-------------------+ + * </pre> + * + * <p>Payload structure is defined by the outer callers. + * + * <p>When a rollover happens and the segment file being replaced has at least 8 bytes left, a special {@link #SWITCH_SEGMENT_RECORD} is + * written at the end of the file. If there are less than 8 bytes left, no switch records are written. + */ +class SegmentFileManager implements ManuallyCloseable { + private static final int WAIT_TIMEOUT_MS = 30_000; + + private static final int MAGIC_NUMBER = 0xFEEDFACE; + + private static final int FORMAT_VERSION = 1; + + static final byte[] HEADER_RECORD = ByteBuffer.allocate(Integer.BYTES + Integer.BYTES) + .order(SegmentFile.BYTE_ORDER) + .putInt(MAGIC_NUMBER) + .putInt(FORMAT_VERSION) + .array(); + + static final byte[] SWITCH_SEGMENT_RECORD = new byte[8]; + + private final Path baseDir; + + /** Configured size of a segment file. */ + private final long fileSize; + + /** + * Current segment file. Can store {@code null} while a rollover is in progress. + */ + private final AtomicReference<SegmentFile> currentSegmentFile = new AtomicReference<>(); + + /** Lock used to block threads while a rollover is in progress. */ + private final Object rolloverLock = new Object(); + + /** + * Current segment file index (used to generate segment file names). + * + * <p>Must always be accessed under the {@link #rolloverLock}. + */ + private int curFileIndex; + + /** + * Flag indicating whether the file manager has been stopped. + * + * <p>Must always be accessed under the {@link #rolloverLock}. + */ + private boolean isStopped; + + SegmentFileManager(Path baseDir, long fileSize) { + this.baseDir = baseDir; + this.fileSize = fileSize; + } + + void start() throws IOException { + // TODO: implement recovery, see https://issues.apache.org/jira/browse/IGNITE-26283. + currentSegmentFile.set(allocateNewSegmentFile(0)); + } + + private SegmentFile allocateNewSegmentFile(int fileIndex) throws IOException { + Path path = baseDir.resolve(segmentFileName(fileIndex, 0)); + + var segmentFile = new SegmentFile(path, fileSize, 0); + + writeHeader(segmentFile); + + return segmentFile; + } + + private static String segmentFileName(int fileIndex, int generation) { + return String.format("segment-%010d-%010d.bin", fileIndex, generation); Review Comment: Let's extract this to a constant to make it more easily discoverable. Also, why is it 10 positions? In the ticket, it seems to be 8. ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IgniteLogStorage.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.storage.segstore; + +import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; + +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer; +import org.apache.ignite.internal.util.FastCrc; +import org.apache.ignite.raft.jraft.entity.LogEntry; +import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder; +import org.apache.ignite.raft.jraft.option.LogStorageOptions; +import org.apache.ignite.raft.jraft.storage.LogStorage; + +/** + * Ignite's {@link LogStorage} implementation. + * + * <p>Every storage instance is associated with a single Raft group, but multiple storage instances can share the same + * {@link SegmentFileManager} instance meaning that they can share the same segment files. + * + * <p>Every appended entry is converted into its serialized form (a.k.a. "payload"), defined by a {@link LogEntryEncoder}, + * and stored in a segment file. + * + * <p>Binary representation of each entry is as follows: + * <pre> + * +---------------+---------+--------------------------+---------+----------------+ + * | Raft Group ID (8 bytes) | Payload Length (4 bytes) | Payload | Hash (4 bytes) | + * +---------------+---------+--------------------------+---------+----------------+ + * </pre> + */ +class IgniteLogStorage implements LogStorage { + static final int GROUP_ID_SIZE_BYTES = Long.BYTES; + + static final int LENGTH_SIZE_BYTES = Integer.BYTES; + + static final int HASH_SIZE = Integer.BYTES; + + private final long groupId; + + private final SegmentFileManager segmentFileManager; + + private volatile LogEntryEncoder logEntryEncoder; + + IgniteLogStorage(long groupId, SegmentFileManager segmentFileManager) { + if (groupId <= 0) { + throw new IllegalArgumentException("groupId must be greater than 0: " + groupId); + } + + this.groupId = groupId; + this.segmentFileManager = segmentFileManager; + } + + @Override + public boolean init(LogStorageOptions opts) { + logEntryEncoder = opts.getLogEntryCodecFactory().encoder(); + + return true; + } + + @Override + public boolean appendEntry(LogEntry entry) { + byte[] bytes = logEntryEncoder.encode(entry); + + try (WriteBuffer writeBuffer = segmentFileManager.reserve(entrySize(bytes))) { + writeEntry(writeBuffer, bytes); + } catch (Exception e) { + throw new IgniteInternalException(INTERNAL_ERR, e); + } + + return true; + } + + private void writeEntry(WriteBuffer writeBuffer, byte[] payload) { + ByteBuffer buffer = writeBuffer.buffer(); + + int pos = buffer.position(); + + buffer + .putLong(groupId) + .putInt(payload.length) + .put(payload); + + int dataSize = buffer.position() - pos; + + // Rewind the position for CRC calculation. + buffer.position(pos); + + int crc = FastCrc.calcCrc(buffer, dataSize); + + // After CRC calculation, the position will be at the provided end of the buffer. + buffer.putInt(crc); + } + + static int entrySize(byte[] payload) { + return GROUP_ID_SIZE_BYTES + LENGTH_SIZE_BYTES + payload.length + HASH_SIZE; + } + + @Override + public int appendEntries(List<LogEntry> entries) { + entries.forEach(this::appendEntry); + + return entries.size(); + } + + @Override + public long getFirstLogIndex() { + throw new UnsupportedOperationException(); Review Comment: Why? Is this going to be implemented under another ticket? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/IgniteLogStorage.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.storage.segstore; + +import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; + +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer; +import org.apache.ignite.internal.util.FastCrc; +import org.apache.ignite.raft.jraft.entity.LogEntry; +import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder; +import org.apache.ignite.raft.jraft.option.LogStorageOptions; +import org.apache.ignite.raft.jraft.storage.LogStorage; + +/** + * Ignite's {@link LogStorage} implementation. + * + * <p>Every storage instance is associated with a single Raft group, but multiple storage instances can share the same + * {@link SegmentFileManager} instance meaning that they can share the same segment files. + * + * <p>Every appended entry is converted into its serialized form (a.k.a. "payload"), defined by a {@link LogEntryEncoder}, + * and stored in a segment file. + * + * <p>Binary representation of each entry is as follows: + * <pre> + * +---------------+---------+--------------------------+---------+----------------+ + * | Raft Group ID (8 bytes) | Payload Length (4 bytes) | Payload | Hash (4 bytes) | + * +---------------+---------+--------------------------+---------+----------------+ + * </pre> + */ +class IgniteLogStorage implements LogStorage { Review Comment: Class name doesn't seem that great. All LogStorage implementations are parts of Ignite. This one is supposed to become the main one, but who knows what happens next. How about calling it `SegstoreLogStorage`? ########## modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/segstore/SegmentFileManager.java: ########## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.storage.segstore; + +import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.internal.close.ManuallyCloseable; +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.raft.storage.segstore.SegmentFile.WriteBuffer; + +/** + * File manager responsible for allocating and maintaining a pointer to the current segment file. + * + * <p>When the current segment file becomes full, that is, it does not contain enough bytes left to satisfy a request by one of the writer + * threads, then a new segment file is allocated and is atomically switched to be the current one. This operation is called rollover. + * + * <p>Every segment file has the following structure: + * <pre> + * +------------------+---------+-----+---------+ + * | Header (8 bytes) | Payload | ... | Payload | + * +------------------+---------+-----+---------+ + * </pre> + * + * <p>Header structure is the following: + * <pre> + * +------------------------+-------------------+ + * | Magic number (4 bytes) | Version (4 bytes) | + * +------------------------+-------------------+ + * </pre> + * + * <p>Payload structure is defined by the outer callers. + * + * <p>When a rollover happens and the segment file being replaced has at least 8 bytes left, a special {@link #SWITCH_SEGMENT_RECORD} is + * written at the end of the file. If there are less than 8 bytes left, no switch records are written. + */ +class SegmentFileManager implements ManuallyCloseable { + private static final int WAIT_TIMEOUT_MS = 30_000; + + private static final int MAGIC_NUMBER = 0xFEEDFACE; + + private static final int FORMAT_VERSION = 1; + + static final byte[] HEADER_RECORD = ByteBuffer.allocate(Integer.BYTES + Integer.BYTES) + .order(SegmentFile.BYTE_ORDER) + .putInt(MAGIC_NUMBER) + .putInt(FORMAT_VERSION) + .array(); + + static final byte[] SWITCH_SEGMENT_RECORD = new byte[8]; + + private final Path baseDir; + + /** Configured size of a segment file. */ + private final long fileSize; + + /** + * Current segment file. Can store {@code null} while a rollover is in progress. Review Comment: Should the comment also say that it will be `null` on an already stopped instance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
