clolov commented on code in PR #21644:
URL: https://github.com/apache/kafka/pull/21644#discussion_r2895644549
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -622,6 +642,988 @@ public void
testFirstUnstableOffsetWithTransactionalData() throws IOException {
assertEquals(Optional.empty(), log.firstUnstableOffset());
}
+ @Test
+ public void testHighWatermarkMetadataUpdatedAfterSegmentRoll() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ MemoryRecords records = LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ));
+
+ log.appendAsLeader(records, 0);
+ assertFetchSizeAndOffsets(log, 0L, 0, List.of());
+
+ log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+ assertFetchSizeAndOffsets(log, 0L, records.sizeInBytes(), List.of(0L,
1L, 2L));
+
+ log.roll();
+ assertFetchSizeAndOffsets(log, 0L, records.sizeInBytes(), List.of(0L,
1L, 2L));
+
+ log.appendAsLeader(records, 0);
+ assertFetchSizeAndOffsets(log, 3L, 0, List.of());
+ }
+
+ private void assertFetchSizeAndOffsets(UnifiedLog log, long fetchOffset,
int expectedSize, List<Long> expectedOffsets) throws IOException {
+ FetchDataInfo readInfo = log.read(
+ fetchOffset,
+ 2048,
+ FetchIsolation.HIGH_WATERMARK,
+ false);
+ assertEquals(expectedSize, readInfo.records.sizeInBytes());
+ List<Long> actualOffsets = new ArrayList<>();
+ readInfo.records.records().forEach(record ->
actualOffsets.add(record.offset()));
+ assertEquals(expectedOffsets, actualOffsets);
+ }
+
+ @Test
+ public void testAppendAsLeaderWithRaftLeader() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ int leaderEpoch = 0;
+
+ Function<Long, MemoryRecords> records = offset ->
LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, offset, leaderEpoch);
+
+ log.appendAsLeader(records.apply(0L), leaderEpoch,
AppendOrigin.RAFT_LEADER);
+ assertEquals(0, log.logStartOffset());
+ assertEquals(3L, log.logEndOffset());
+
+ // Since raft leader is responsible for assigning offsets, and the
LogValidator is bypassed from the performance perspective,
+ // so the first offset of the MemoryRecords to be appended should
equal to the next offset in the log
+ assertThrows(UnexpectedAppendOffsetException.class, () ->
log.appendAsLeader(records.apply(1L), leaderEpoch, AppendOrigin.RAFT_LEADER));
+
+ // When the first offset of the MemoryRecords to be appended equals to
the next offset in the log, append will succeed
+ log.appendAsLeader(records.apply(3L), leaderEpoch,
AppendOrigin.RAFT_LEADER);
+ assertEquals(6, log.logEndOffset());
+ }
+
+ @Test
+ public void testAppendInfoFirstOffset() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ List<SimpleRecord> simpleRecords = List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ );
+
+ MemoryRecords records = LogTestUtils.records(simpleRecords);
+
+ LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0);
+ assertEquals(0, firstAppendInfo.firstOffset());
+
+ LogAppendInfo secondAppendInfo = log.appendAsLeader(
+ LogTestUtils.records(simpleRecords),
+ 0
+ );
+ assertEquals(simpleRecords.size(), secondAppendInfo.firstOffset());
+
+ log.roll();
+ LogAppendInfo afterRollAppendInfo =
log.appendAsLeader(LogTestUtils.records(simpleRecords), 0);
+ assertEquals(simpleRecords.size() * 2,
afterRollAppendInfo.firstOffset());
+ }
+
+ @Test
+ public void testTruncateBelowFirstUnstableOffset() throws IOException {
+ testTruncateBelowFirstUnstableOffset(UnifiedLog::truncateTo);
+ }
+
+ @Test
+ public void testTruncateFullyAndStartBelowFirstUnstableOffset() throws
IOException {
+ testTruncateBelowFirstUnstableOffset((log, targetOffset) ->
log.truncateFullyAndStartAt(targetOffset, Optional.empty()));
+ }
+
+ @Test
+ public void testTruncateFullyAndStart() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long producerId = 17L;
+ short producerEpoch = 10;
+ int sequence = 0;
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+ Compression.NONE,
+ producerId,
+ producerEpoch,
+ sequence,
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ ), 0);
+ assertEquals(Optional.of(3L), log.firstUnstableOffset());
+
+ // We close and reopen the log to ensure that the first unstable
offset segment
+ // position will be undefined when we truncate the log.
+ log.close();
+
+ UnifiedLog reopened = createLog(logDir, logConfig);
+ assertEquals(Optional.of(new LogOffsetMetadata(3L)),
reopened.producerStateManager().firstUnstableOffset());
+
+ reopened.truncateFullyAndStartAt(2L, Optional.of(1L));
+ assertEquals(Optional.empty(), reopened.firstUnstableOffset());
+ assertEquals(Map.of(),
reopened.producerStateManager().activeProducers());
+ assertEquals(1L, reopened.logStartOffset());
+ assertEquals(2L, reopened.logEndOffset());
+ }
+
+ private void testTruncateBelowFirstUnstableOffset(BiConsumer<UnifiedLog,
Long> truncateFunc) throws IOException {
+ // Verify that truncation below the first unstable offset correctly
+ // resets the producer state. Specifically we are testing the case when
+ // the segment position of the first unstable offset is unknown.
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long producerId = 17L;
+ short producerEpoch = 10;
+ int sequence = 0;
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+ Compression.NONE,
+ producerId,
+ producerEpoch,
+ sequence,
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ ), 0);
+ assertEquals(Optional.of(3L), log.firstUnstableOffset());
+
+ // We close and reopen the log to ensure that the first unstable
offset segment
+ // position will be undefined when we truncate the log.
+ log.close();
+
+ UnifiedLog reopened = createLog(logDir, logConfig);
+ assertEquals(Optional.of(new LogOffsetMetadata(3L)),
reopened.producerStateManager().firstUnstableOffset());
+
+ truncateFunc.accept(reopened, 0L);
+ assertEquals(Optional.empty(), reopened.firstUnstableOffset());
+ assertEquals(Map.of(),
reopened.producerStateManager().activeProducers());
+ }
+
+ @Test
+ public void testHighWatermarkMaintenance() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ int leaderEpoch = 0;
+
+ Function<Long, MemoryRecords> records = offset ->
LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, offset, leaderEpoch);
+
+ // High watermark initialized to 0
+ assertHighWatermark(log, 0L);
+
+ // High watermark not changed by append
+ log.appendAsLeader(records.apply(0L), leaderEpoch);
+ assertHighWatermark(log, 0L);
+
+ // Update high watermark as leader
+ log.maybeIncrementHighWatermark(new LogOffsetMetadata(1L));
+ assertHighWatermark(log, 1L);
+
+ // Cannot update past the log end offset
+ log.updateHighWatermark(5L);
+ assertHighWatermark(log, 3L);
+
+ // Update high watermark as follower
+ log.appendAsFollower(records.apply(3L), leaderEpoch);
+ log.updateHighWatermark(6L);
+ assertHighWatermark(log, 6L);
+
+ // High watermark should be adjusted by truncation
+ log.truncateTo(3L);
+ assertHighWatermark(log, 3L);
+
+ log.appendAsLeader(records.apply(0L), 0);
+ assertHighWatermark(log, 3L);
+ assertEquals(6L, log.logEndOffset());
+ assertEquals(0L, log.logStartOffset());
+
+ // Full truncation should also reset high watermark
+ log.truncateFullyAndStartAt(4L, Optional.empty());
+ assertEquals(4L, log.logEndOffset());
+ assertEquals(4L, log.logStartOffset());
+ assertHighWatermark(log, 4L);
+ }
+
+ private void assertHighWatermark(UnifiedLog log, long offset) throws
IOException {
+ assertEquals(offset, log.highWatermark());
+ assertValidLogOffsetMetadata(log,
log.fetchOffsetSnapshot().highWatermark());
+ }
+
+ private void assertNonEmptyFetch(UnifiedLog log, long offset,
FetchIsolation isolation, long batchBaseOffset) throws IOException {
+ FetchDataInfo readInfo = log.read(offset, Integer.MAX_VALUE,
isolation, true);
+
+ assertFalse(readInfo.firstEntryIncomplete);
+ assertTrue(readInfo.records.sizeInBytes() > 0);
+
+ long upperBoundOffset = switch (isolation) {
+ case LOG_END -> log.logEndOffset();
+ case HIGH_WATERMARK -> log.highWatermark();
+ case TXN_COMMITTED -> log.lastStableOffset();
+ };
+
+ for (Record record : readInfo.records.records())
+ assertTrue(record.offset() < upperBoundOffset);
+
+ assertEquals(batchBaseOffset,
readInfo.fetchOffsetMetadata.messageOffset);
+ assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata);
+ }
+
+ private void assertEmptyFetch(UnifiedLog log, long offset, FetchIsolation
isolation, long batchBaseOffset) throws IOException {
+ FetchDataInfo readInfo = log.read(offset, Integer.MAX_VALUE,
isolation, true);
+ assertFalse(readInfo.firstEntryIncomplete);
+ assertEquals(0, readInfo.records.sizeInBytes());
+ assertEquals(batchBaseOffset,
readInfo.fetchOffsetMetadata.messageOffset);
+ assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata);
+ }
+
+ @Test
+ public void testFetchUpToLogEndOffset() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ )), 0);
+ TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 3L, 5L));
+
+ for (long offset = log.logStartOffset(); offset < log.logEndOffset();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END,
batchBaseOffset);
+ }
+ }
+
+ @Test
+ public void testFetchUpToHighWatermark() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ )), 0);
+ TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 3L, 5L));
+
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+
+ log.updateHighWatermark(3L);
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+
+ log.updateHighWatermark(5L);
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+ }
+
+ private void assertHighWatermarkBoundedFetches(UnifiedLog log,
TreeSet<Long> batchBaseOffsets) throws IOException {
+ for (long offset = log.logStartOffset(); offset < log.highWatermark();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset);
+ }
+
+ for (long offset = log.highWatermark(); offset < log.logEndOffset();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset);
+ }
+ }
+
+ @Test
+ public void testActiveProducers() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ // Test transactional producer state (open transaction)
+ short producer1Epoch = 5;
+ long producerId1 = 1L;
+ LogTestUtils.appendTransactionalAsLeader(log, producerId1,
producer1Epoch, mockTime).accept(5);
+ assertProducerState(
+ log,
+ producerId1,
+ producer1Epoch,
+ 4,
+ Optional.of(0L),
+ Optional.empty()
+ );
+
+ // Test transactional producer state (closed transaction)
+ int coordinatorEpoch = 15;
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId1,
producer1Epoch, ControlRecordType.COMMIT,
+ mockTime.milliseconds(), coordinatorEpoch, 0,
TransactionVersion.TV_0.featureLevel());
+ assertProducerState(
+ log,
+ producerId1,
+ producer1Epoch,
+ 4,
+ Optional.empty(),
+ Optional.of(coordinatorEpoch)
+ );
+
+ // Test idempotent producer state
+ short producer2Epoch = 5;
+ long producerId2 = 2L;
+ LogTestUtils.appendIdempotentAsLeader(log, producerId2,
producer2Epoch, mockTime, false).accept(3);
+ assertProducerState(
+ log,
+ producerId2,
+ producer2Epoch,
+ 2,
+ Optional.empty(),
+ Optional.empty()
+ );
+ }
+
+ private void assertProducerState(
+ UnifiedLog log,
+ long producerId,
+ short producerEpoch,
+ int lastSequence,
+ Optional<Long> currentTxnStartOffset,
+ Optional<Integer> coordinatorEpoch
+ ) {
+ Optional<DescribeProducersResponseData.ProducerState> producerStateOpt
= log.activeProducers().stream().filter(p -> p.producerId() ==
producerId).findFirst();
+ assertTrue(producerStateOpt.isPresent());
+
+ DescribeProducersResponseData.ProducerState producerState =
producerStateOpt.get();
+ assertEquals(producerEpoch, producerState.producerEpoch());
+ assertEquals(lastSequence, producerState.lastSequence());
+ assertEquals(currentTxnStartOffset.orElse(-1L),
producerState.currentTxnStartOffset());
+ assertEquals(coordinatorEpoch.orElse(-1),
producerState.coordinatorEpoch());
+ }
+
+ @Test
+ public void testFetchUpToLastStableOffset() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ short epoch = 0;
+
+ long producerId1 = 1L;
+ long producerId2 = 2L;
+
+ Consumer<Integer> appendProducer1 =
LogTestUtils.appendTransactionalAsLeader(log, producerId1, epoch, mockTime);
+ Consumer<Integer> appendProducer2 =
LogTestUtils.appendTransactionalAsLeader(log, producerId2, epoch, mockTime);
+
+ appendProducer1.accept(5);
+ LogTestUtils.appendNonTransactionalAsLeader(log, 3);
+ appendProducer2.accept(2);
+ appendProducer1.accept(4);
+ LogTestUtils.appendNonTransactionalAsLeader(log, 2);
+ appendProducer1.accept(10);
+
+ TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 5L, 8L,
10L, 14L, 16L, 26L, 27L, 28L));
+
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId1, epoch,
ControlRecordType.COMMIT, mockTime.milliseconds(),
+ 0, 0, TransactionVersion.TV_0.featureLevel());
+ assertEquals(0L, log.lastStableOffset());
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(8L, log.lastStableOffset());
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId2, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
+ 0, 0, TransactionVersion.TV_0.featureLevel());
+ assertEquals(8L, log.lastStableOffset());
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(log.logEndOffset(), log.lastStableOffset());
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+ }
+
+ private void assertLsoBoundedFetches(UnifiedLog log, TreeSet<Long>
batchBaseOffsets) throws IOException {
+ for (long offset = log.logStartOffset(); offset <
log.lastStableOffset(); offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED,
batchBaseOffset);
+ }
+
+ for (long offset = log.lastStableOffset(); offset <
log.logEndOffset(); offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED,
batchBaseOffset);
+ }
Review Comment:
This is similar to my other comment
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -622,6 +642,988 @@ public void
testFirstUnstableOffsetWithTransactionalData() throws IOException {
assertEquals(Optional.empty(), log.firstUnstableOffset());
}
+ @Test
+ public void testHighWatermarkMetadataUpdatedAfterSegmentRoll() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ MemoryRecords records = LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ));
+
+ log.appendAsLeader(records, 0);
+ assertFetchSizeAndOffsets(log, 0L, 0, List.of());
+
+ log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+ assertFetchSizeAndOffsets(log, 0L, records.sizeInBytes(), List.of(0L,
1L, 2L));
+
+ log.roll();
+ assertFetchSizeAndOffsets(log, 0L, records.sizeInBytes(), List.of(0L,
1L, 2L));
+
+ log.appendAsLeader(records, 0);
+ assertFetchSizeAndOffsets(log, 3L, 0, List.of());
+ }
+
+ private void assertFetchSizeAndOffsets(UnifiedLog log, long fetchOffset,
int expectedSize, List<Long> expectedOffsets) throws IOException {
+ FetchDataInfo readInfo = log.read(
+ fetchOffset,
+ 2048,
+ FetchIsolation.HIGH_WATERMARK,
+ false);
+ assertEquals(expectedSize, readInfo.records.sizeInBytes());
+ List<Long> actualOffsets = new ArrayList<>();
+ readInfo.records.records().forEach(record ->
actualOffsets.add(record.offset()));
+ assertEquals(expectedOffsets, actualOffsets);
+ }
+
+ @Test
+ public void testAppendAsLeaderWithRaftLeader() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ int leaderEpoch = 0;
+
+ Function<Long, MemoryRecords> records = offset ->
LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, offset, leaderEpoch);
+
+ log.appendAsLeader(records.apply(0L), leaderEpoch,
AppendOrigin.RAFT_LEADER);
+ assertEquals(0, log.logStartOffset());
+ assertEquals(3L, log.logEndOffset());
+
+ // Since raft leader is responsible for assigning offsets, and the
LogValidator is bypassed from the performance perspective,
+ // so the first offset of the MemoryRecords to be appended should
equal to the next offset in the log
+ assertThrows(UnexpectedAppendOffsetException.class, () ->
log.appendAsLeader(records.apply(1L), leaderEpoch, AppendOrigin.RAFT_LEADER));
+
+ // When the first offset of the MemoryRecords to be appended equals to
the next offset in the log, append will succeed
+ log.appendAsLeader(records.apply(3L), leaderEpoch,
AppendOrigin.RAFT_LEADER);
+ assertEquals(6, log.logEndOffset());
+ }
+
+ @Test
+ public void testAppendInfoFirstOffset() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ List<SimpleRecord> simpleRecords = List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ );
+
+ MemoryRecords records = LogTestUtils.records(simpleRecords);
+
+ LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0);
+ assertEquals(0, firstAppendInfo.firstOffset());
+
+ LogAppendInfo secondAppendInfo = log.appendAsLeader(
+ LogTestUtils.records(simpleRecords),
+ 0
+ );
+ assertEquals(simpleRecords.size(), secondAppendInfo.firstOffset());
+
+ log.roll();
+ LogAppendInfo afterRollAppendInfo =
log.appendAsLeader(LogTestUtils.records(simpleRecords), 0);
+ assertEquals(simpleRecords.size() * 2,
afterRollAppendInfo.firstOffset());
+ }
+
+ @Test
+ public void testTruncateBelowFirstUnstableOffset() throws IOException {
+ testTruncateBelowFirstUnstableOffset(UnifiedLog::truncateTo);
+ }
+
+ @Test
+ public void testTruncateFullyAndStartBelowFirstUnstableOffset() throws
IOException {
+ testTruncateBelowFirstUnstableOffset((log, targetOffset) ->
log.truncateFullyAndStartAt(targetOffset, Optional.empty()));
+ }
+
+ @Test
+ public void testTruncateFullyAndStart() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long producerId = 17L;
+ short producerEpoch = 10;
+ int sequence = 0;
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+ Compression.NONE,
+ producerId,
+ producerEpoch,
+ sequence,
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ ), 0);
+ assertEquals(Optional.of(3L), log.firstUnstableOffset());
+
+ // We close and reopen the log to ensure that the first unstable
offset segment
+ // position will be undefined when we truncate the log.
+ log.close();
+
+ UnifiedLog reopened = createLog(logDir, logConfig);
+ assertEquals(Optional.of(new LogOffsetMetadata(3L)),
reopened.producerStateManager().firstUnstableOffset());
+
+ reopened.truncateFullyAndStartAt(2L, Optional.of(1L));
+ assertEquals(Optional.empty(), reopened.firstUnstableOffset());
+ assertEquals(Map.of(),
reopened.producerStateManager().activeProducers());
+ assertEquals(1L, reopened.logStartOffset());
+ assertEquals(2L, reopened.logEndOffset());
+ }
+
+ private void testTruncateBelowFirstUnstableOffset(BiConsumer<UnifiedLog,
Long> truncateFunc) throws IOException {
+ // Verify that truncation below the first unstable offset correctly
+ // resets the producer state. Specifically we are testing the case when
+ // the segment position of the first unstable offset is unknown.
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long producerId = 17L;
+ short producerEpoch = 10;
+ int sequence = 0;
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+ Compression.NONE,
+ producerId,
+ producerEpoch,
+ sequence,
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ ), 0);
+ assertEquals(Optional.of(3L), log.firstUnstableOffset());
+
+ // We close and reopen the log to ensure that the first unstable
offset segment
+ // position will be undefined when we truncate the log.
+ log.close();
+
+ UnifiedLog reopened = createLog(logDir, logConfig);
+ assertEquals(Optional.of(new LogOffsetMetadata(3L)),
reopened.producerStateManager().firstUnstableOffset());
+
+ truncateFunc.accept(reopened, 0L);
+ assertEquals(Optional.empty(), reopened.firstUnstableOffset());
+ assertEquals(Map.of(),
reopened.producerStateManager().activeProducers());
+ }
+
+ @Test
+ public void testHighWatermarkMaintenance() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ int leaderEpoch = 0;
+
+ Function<Long, MemoryRecords> records = offset ->
LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, offset, leaderEpoch);
+
+ // High watermark initialized to 0
+ assertHighWatermark(log, 0L);
+
+ // High watermark not changed by append
+ log.appendAsLeader(records.apply(0L), leaderEpoch);
+ assertHighWatermark(log, 0L);
+
+ // Update high watermark as leader
+ log.maybeIncrementHighWatermark(new LogOffsetMetadata(1L));
+ assertHighWatermark(log, 1L);
+
+ // Cannot update past the log end offset
+ log.updateHighWatermark(5L);
+ assertHighWatermark(log, 3L);
+
+ // Update high watermark as follower
+ log.appendAsFollower(records.apply(3L), leaderEpoch);
+ log.updateHighWatermark(6L);
+ assertHighWatermark(log, 6L);
+
+ // High watermark should be adjusted by truncation
+ log.truncateTo(3L);
+ assertHighWatermark(log, 3L);
+
+ log.appendAsLeader(records.apply(0L), 0);
+ assertHighWatermark(log, 3L);
+ assertEquals(6L, log.logEndOffset());
+ assertEquals(0L, log.logStartOffset());
+
+ // Full truncation should also reset high watermark
+ log.truncateFullyAndStartAt(4L, Optional.empty());
+ assertEquals(4L, log.logEndOffset());
+ assertEquals(4L, log.logStartOffset());
+ assertHighWatermark(log, 4L);
+ }
+
+ private void assertHighWatermark(UnifiedLog log, long offset) throws
IOException {
+ assertEquals(offset, log.highWatermark());
+ assertValidLogOffsetMetadata(log,
log.fetchOffsetSnapshot().highWatermark());
+ }
+
+ private void assertNonEmptyFetch(UnifiedLog log, long offset,
FetchIsolation isolation, long batchBaseOffset) throws IOException {
+ FetchDataInfo readInfo = log.read(offset, Integer.MAX_VALUE,
isolation, true);
+
+ assertFalse(readInfo.firstEntryIncomplete);
+ assertTrue(readInfo.records.sizeInBytes() > 0);
+
+ long upperBoundOffset = switch (isolation) {
+ case LOG_END -> log.logEndOffset();
+ case HIGH_WATERMARK -> log.highWatermark();
+ case TXN_COMMITTED -> log.lastStableOffset();
+ };
+
+ for (Record record : readInfo.records.records())
+ assertTrue(record.offset() < upperBoundOffset);
+
+ assertEquals(batchBaseOffset,
readInfo.fetchOffsetMetadata.messageOffset);
+ assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata);
+ }
+
+ private void assertEmptyFetch(UnifiedLog log, long offset, FetchIsolation
isolation, long batchBaseOffset) throws IOException {
+ FetchDataInfo readInfo = log.read(offset, Integer.MAX_VALUE,
isolation, true);
+ assertFalse(readInfo.firstEntryIncomplete);
+ assertEquals(0, readInfo.records.sizeInBytes());
+ assertEquals(batchBaseOffset,
readInfo.fetchOffsetMetadata.messageOffset);
+ assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata);
+ }
+
+ @Test
+ public void testFetchUpToLogEndOffset() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ )), 0);
+ TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 3L, 5L));
+
+ for (long offset = log.logStartOffset(); offset < log.logEndOffset();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END,
batchBaseOffset);
+ }
+ }
+
+ @Test
+ public void testFetchUpToHighWatermark() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ )), 0);
+ TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 3L, 5L));
+
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+
+ log.updateHighWatermark(3L);
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+
+ log.updateHighWatermark(5L);
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+ }
+
+ private void assertHighWatermarkBoundedFetches(UnifiedLog log,
TreeSet<Long> batchBaseOffsets) throws IOException {
+ for (long offset = log.logStartOffset(); offset < log.highWatermark();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset);
+ }
+
+ for (long offset = log.highWatermark(); offset < log.logEndOffset();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset);
+ }
+ }
+
+ @Test
+ public void testActiveProducers() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ // Test transactional producer state (open transaction)
+ short producer1Epoch = 5;
+ long producerId1 = 1L;
+ LogTestUtils.appendTransactionalAsLeader(log, producerId1,
producer1Epoch, mockTime).accept(5);
+ assertProducerState(
+ log,
+ producerId1,
+ producer1Epoch,
+ 4,
+ Optional.of(0L),
+ Optional.empty()
+ );
+
+ // Test transactional producer state (closed transaction)
+ int coordinatorEpoch = 15;
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId1,
producer1Epoch, ControlRecordType.COMMIT,
+ mockTime.milliseconds(), coordinatorEpoch, 0,
TransactionVersion.TV_0.featureLevel());
+ assertProducerState(
+ log,
+ producerId1,
+ producer1Epoch,
+ 4,
+ Optional.empty(),
+ Optional.of(coordinatorEpoch)
+ );
+
+ // Test idempotent producer state
+ short producer2Epoch = 5;
+ long producerId2 = 2L;
+ LogTestUtils.appendIdempotentAsLeader(log, producerId2,
producer2Epoch, mockTime, false).accept(3);
+ assertProducerState(
+ log,
+ producerId2,
+ producer2Epoch,
+ 2,
+ Optional.empty(),
+ Optional.empty()
+ );
+ }
+
+ private void assertProducerState(
+ UnifiedLog log,
+ long producerId,
+ short producerEpoch,
+ int lastSequence,
+ Optional<Long> currentTxnStartOffset,
+ Optional<Integer> coordinatorEpoch
+ ) {
+ Optional<DescribeProducersResponseData.ProducerState> producerStateOpt
= log.activeProducers().stream().filter(p -> p.producerId() ==
producerId).findFirst();
+ assertTrue(producerStateOpt.isPresent());
+
+ DescribeProducersResponseData.ProducerState producerState =
producerStateOpt.get();
+ assertEquals(producerEpoch, producerState.producerEpoch());
+ assertEquals(lastSequence, producerState.lastSequence());
+ assertEquals(currentTxnStartOffset.orElse(-1L),
producerState.currentTxnStartOffset());
+ assertEquals(coordinatorEpoch.orElse(-1),
producerState.coordinatorEpoch());
+ }
+
+ @Test
+ public void testFetchUpToLastStableOffset() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ short epoch = 0;
+
+ long producerId1 = 1L;
+ long producerId2 = 2L;
+
+ Consumer<Integer> appendProducer1 =
LogTestUtils.appendTransactionalAsLeader(log, producerId1, epoch, mockTime);
+ Consumer<Integer> appendProducer2 =
LogTestUtils.appendTransactionalAsLeader(log, producerId2, epoch, mockTime);
+
+ appendProducer1.accept(5);
+ LogTestUtils.appendNonTransactionalAsLeader(log, 3);
+ appendProducer2.accept(2);
+ appendProducer1.accept(4);
+ LogTestUtils.appendNonTransactionalAsLeader(log, 2);
+ appendProducer1.accept(10);
+
+ TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 5L, 8L,
10L, 14L, 16L, 26L, 27L, 28L));
+
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId1, epoch,
ControlRecordType.COMMIT, mockTime.milliseconds(),
+ 0, 0, TransactionVersion.TV_0.featureLevel());
+ assertEquals(0L, log.lastStableOffset());
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(8L, log.lastStableOffset());
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId2, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
+ 0, 0, TransactionVersion.TV_0.featureLevel());
+ assertEquals(8L, log.lastStableOffset());
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(log.logEndOffset(), log.lastStableOffset());
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+ }
+
+ private void assertLsoBoundedFetches(UnifiedLog log, TreeSet<Long>
batchBaseOffsets) throws IOException {
+ for (long offset = log.logStartOffset(); offset <
log.lastStableOffset(); offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED,
batchBaseOffset);
+ }
+
+ for (long offset = log.lastStableOffset(); offset <
log.logEndOffset(); offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED,
batchBaseOffset);
+ }
+ }
+
+ /**
+ * Tests for time based log roll. This test appends messages then changes
the time
+ * using the mock clock to force the log to roll and checks the number of
segments.
+ */
+ @Test
+ public void testTimeBasedLogRollDuringAppend() throws IOException {
+ Supplier<MemoryRecords> createRecords = () ->
LogTestUtils.records(List.of(new SimpleRecord("test".getBytes())));
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentMs(ONE_HOUR).build();
+
+ // create a log
+ UnifiedLog log = createLog(logDir, logConfig, 0L, 0L,
brokerTopicStats, mockTime.scheduler, mockTime,
+ new ProducerStateManagerConfig(24 * 60, false), true,
Optional.empty(), false);
+ assertEquals(1, log.numberOfSegments(), "Log begins with a single
empty segment.");
+ // Test the segment rolling behavior when messages do not have a
timestamp.
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(createRecords.get(), 0);
+ assertEquals(1, log.numberOfSegments(), "Log doesn't roll if doing so
creates an empty segment.");
+
+ log.appendAsLeader(createRecords.get(), 0);
+ assertEquals(2, log.numberOfSegments(), "Log rolls on this append
since time has expired.");
+
+ for (int numSegments = 3; numSegments < 5; numSegments++) {
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(createRecords.get(), 0);
+ assertEquals(numSegments, log.numberOfSegments(), "Changing time
beyond rollMs and appending should create a new segment.");
+ }
+
+ // Append a message with timestamp to a segment whose first message do
not have a timestamp.
+ long timestamp = mockTime.milliseconds() + log.config().segmentMs + 1;
+ Supplier<MemoryRecords> recordWithTimestamp = () ->
LogTestUtils.records(List.of(new SimpleRecord(timestamp, "test".getBytes())));
+ log.appendAsLeader(recordWithTimestamp.get(), 0);
+ assertEquals(4, log.numberOfSegments(), "Segment should not have been
rolled out because the log rolling should be based on wall clock.");
+
+ // Test the segment rolling behavior when messages have timestamps.
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(recordWithTimestamp.get(), 0);
+ assertEquals(5, log.numberOfSegments(), "A new segment should have
been rolled out");
+
+ // move the wall clock beyond log rolling time
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(recordWithTimestamp.get(), 0);
+ assertEquals(5, log.numberOfSegments(), "Log should not roll because
the roll should depend on timestamp of the first message.");
+
+ Supplier<MemoryRecords> recordWithExpiredTimestamp = () ->
LogTestUtils.records(List.of(new SimpleRecord(mockTime.milliseconds(),
"test".getBytes())));
+ log.appendAsLeader(recordWithExpiredTimestamp.get(), 0);
+ assertEquals(6, log.numberOfSegments(), "Log should roll because the
timestamp in the message should make the log segment expire.");
+
+ int numSegments = log.numberOfSegments();
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE), 0);
+ assertEquals(numSegments, log.numberOfSegments(), "Appending an empty
message set should not roll log even if sufficient time has passed.");
+ }
+
+ @Test
+ public void testRollSegmentThatAlreadyExists() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentMs(ONE_HOUR).build();
+ int partitionLeaderEpoch = 0;
+
+ // create a log
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertEquals(1, log.numberOfSegments(), "Log begins with a single
empty segment.");
+
+ // roll active segment with the same base offset of size zero should
recreate the segment
+ log.roll(Optional.of(0L));
+ assertEquals(1, log.numberOfSegments(), "Expect 1 segment after roll()
empty segment with base offset.");
+
+ // should be able to append records to active segment
+ MemoryRecords records = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"k1".getBytes(), "v1".getBytes())),
+ 0L, partitionLeaderEpoch);
+ log.appendAsFollower(records, partitionLeaderEpoch);
+ assertEquals(1, log.numberOfSegments(), "Expect one segment.");
+ assertEquals(0L, log.activeSegment().baseOffset());
+
+ // make sure we can append more records
+ MemoryRecords records2 = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds() + 10,
"k2".getBytes(), "v2".getBytes())),
+ 1L, partitionLeaderEpoch);
+ log.appendAsFollower(records2, partitionLeaderEpoch);
+
+ assertEquals(2, log.logEndOffset(), "Expect two records in the log");
+ assertEquals(0, log.read(0, 1, FetchIsolation.LOG_END,
true).records.batches().iterator().next().lastOffset());
+ assertEquals(1, log.read(1, 1, FetchIsolation.LOG_END,
true).records.batches().iterator().next().lastOffset());
+
+ // roll so that active segment is empty
+ log.roll();
+ assertEquals(2L, log.activeSegment().baseOffset(), "Expect base offset
of active segment to be LEO");
+ assertEquals(2, log.numberOfSegments(), "Expect two segments.");
+
+ // manually resize offset index to force roll of an empty active
segment on next append
+ log.activeSegment().offsetIndex().resize(0);
+ MemoryRecords records3 = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds() + 12,
"k3".getBytes(), "v3".getBytes())),
+ 2L, partitionLeaderEpoch);
+ log.appendAsFollower(records3, partitionLeaderEpoch);
+ assertTrue(log.activeSegment().offsetIndex().maxEntries() > 1);
+ assertEquals(2, log.read(2, 1, FetchIsolation.LOG_END,
true).records.batches().iterator().next().lastOffset());
+ assertEquals(2, log.numberOfSegments(), "Expect two segments.");
+ }
+
+ @Test
+ public void testNonSequentialAppend() throws IOException {
+ // create a log
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+ long pid = 1L;
+ short epoch = 0;
+
+ MemoryRecords records = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
+ pid, epoch, 0, 0L);
+ log.appendAsLeader(records, 0);
+
+ MemoryRecords nextRecords = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
+ pid, epoch, 2, 0L);
+ assertThrows(OutOfOrderSequenceException.class, () ->
log.appendAsLeader(nextRecords, 0));
+ }
+
+ @Test
+ public void testTruncateToEndOffsetClearsEpochCache() throws IOException {
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+
+ // Seed some initial data in the log
+ MemoryRecords records = LogTestUtils.records(List.of(new
SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())),
+ 27, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ appendAsFollower(log, records, 19);
+ assertEquals(Optional.of(new EpochEntry(19, 27)),
log.leaderEpochCache().latestEntry());
+ assertEquals(29, log.logEndOffset());
+
+ // Truncations greater than or equal to the log end offset should
+ // clear the epoch cache
+ verifyTruncationClearsEpochCache(log, 20, log.logEndOffset());
+ verifyTruncationClearsEpochCache(log, 24, log.logEndOffset() + 1);
+ }
+
+ private void verifyTruncationClearsEpochCache(UnifiedLog log, int epoch,
long truncationOffset) {
+ // Simulate becoming a leader
+ log.assignEpochStartOffset(epoch, log.logEndOffset());
+ assertEquals(Optional.of(new EpochEntry(epoch, 29)),
log.leaderEpochCache().latestEntry());
+ assertEquals(29, log.logEndOffset());
+
+ // Now we become the follower and truncate to an offset greater
+ // than or equal to the log end offset. The trivial epoch entry
+ // at the end of the log should be gone
+ log.truncateTo(truncationOffset);
+ assertEquals(Optional.of(new EpochEntry(19, 27)),
log.leaderEpochCache().latestEntry());
+ assertEquals(29, log.logEndOffset());
+ }
+
+ /**
+ * Test the values returned by the logSegments call
+ */
+ @Test
+ public void testLogSegmentsCallCorrect() throws IOException {
+ // Create 3 segments and make sure we get the right values from
various logSegments calls.
+ Supplier<MemoryRecords> createRecords = () ->
LogTestUtils.records(List.of(new SimpleRecord("test".getBytes())),
mockTime.milliseconds());
+
+ int setSize = createRecords.get().sizeInBytes();
+ int msgPerSeg = 10;
+ int segmentSize = msgPerSeg * setSize; // each segment will be 10
messages
+ // create a log
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(segmentSize).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segment.");
+
+ // segments expire in size
+ for (int i = 1; i < (2 * msgPerSeg + 2); i++) {
+ log.appendAsLeader(createRecords.get(), 0);
+ }
Review Comment:
Similar to my other comment
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -622,6 +642,988 @@ public void
testFirstUnstableOffsetWithTransactionalData() throws IOException {
assertEquals(Optional.empty(), log.firstUnstableOffset());
}
+ @Test
+ public void testHighWatermarkMetadataUpdatedAfterSegmentRoll() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ MemoryRecords records = LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ));
+
+ log.appendAsLeader(records, 0);
+ assertFetchSizeAndOffsets(log, 0L, 0, List.of());
+
+ log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+ assertFetchSizeAndOffsets(log, 0L, records.sizeInBytes(), List.of(0L,
1L, 2L));
+
+ log.roll();
+ assertFetchSizeAndOffsets(log, 0L, records.sizeInBytes(), List.of(0L,
1L, 2L));
+
+ log.appendAsLeader(records, 0);
+ assertFetchSizeAndOffsets(log, 3L, 0, List.of());
+ }
+
+ private void assertFetchSizeAndOffsets(UnifiedLog log, long fetchOffset,
int expectedSize, List<Long> expectedOffsets) throws IOException {
+ FetchDataInfo readInfo = log.read(
+ fetchOffset,
+ 2048,
+ FetchIsolation.HIGH_WATERMARK,
+ false);
+ assertEquals(expectedSize, readInfo.records.sizeInBytes());
+ List<Long> actualOffsets = new ArrayList<>();
+ readInfo.records.records().forEach(record ->
actualOffsets.add(record.offset()));
+ assertEquals(expectedOffsets, actualOffsets);
+ }
+
+ @Test
+ public void testAppendAsLeaderWithRaftLeader() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ int leaderEpoch = 0;
+
+ Function<Long, MemoryRecords> records = offset ->
LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, offset, leaderEpoch);
+
+ log.appendAsLeader(records.apply(0L), leaderEpoch,
AppendOrigin.RAFT_LEADER);
+ assertEquals(0, log.logStartOffset());
+ assertEquals(3L, log.logEndOffset());
+
+ // Since raft leader is responsible for assigning offsets, and the
LogValidator is bypassed from the performance perspective,
+ // so the first offset of the MemoryRecords to be appended should
equal to the next offset in the log
+ assertThrows(UnexpectedAppendOffsetException.class, () ->
log.appendAsLeader(records.apply(1L), leaderEpoch, AppendOrigin.RAFT_LEADER));
+
+ // When the first offset of the MemoryRecords to be appended equals to
the next offset in the log, append will succeed
+ log.appendAsLeader(records.apply(3L), leaderEpoch,
AppendOrigin.RAFT_LEADER);
+ assertEquals(6, log.logEndOffset());
+ }
+
+ @Test
+ public void testAppendInfoFirstOffset() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ List<SimpleRecord> simpleRecords = List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ );
+
+ MemoryRecords records = LogTestUtils.records(simpleRecords);
+
+ LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0);
+ assertEquals(0, firstAppendInfo.firstOffset());
+
+ LogAppendInfo secondAppendInfo = log.appendAsLeader(
+ LogTestUtils.records(simpleRecords),
+ 0
+ );
+ assertEquals(simpleRecords.size(), secondAppendInfo.firstOffset());
+
+ log.roll();
+ LogAppendInfo afterRollAppendInfo =
log.appendAsLeader(LogTestUtils.records(simpleRecords), 0);
+ assertEquals(simpleRecords.size() * 2,
afterRollAppendInfo.firstOffset());
+ }
+
+ @Test
+ public void testTruncateBelowFirstUnstableOffset() throws IOException {
+ testTruncateBelowFirstUnstableOffset(UnifiedLog::truncateTo);
+ }
+
+ @Test
+ public void testTruncateFullyAndStartBelowFirstUnstableOffset() throws
IOException {
+ testTruncateBelowFirstUnstableOffset((log, targetOffset) ->
log.truncateFullyAndStartAt(targetOffset, Optional.empty()));
+ }
+
+ @Test
+ public void testTruncateFullyAndStart() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long producerId = 17L;
+ short producerEpoch = 10;
+ int sequence = 0;
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+ Compression.NONE,
+ producerId,
+ producerEpoch,
+ sequence,
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ ), 0);
+ assertEquals(Optional.of(3L), log.firstUnstableOffset());
+
+ // We close and reopen the log to ensure that the first unstable
offset segment
+ // position will be undefined when we truncate the log.
+ log.close();
+
+ UnifiedLog reopened = createLog(logDir, logConfig);
+ assertEquals(Optional.of(new LogOffsetMetadata(3L)),
reopened.producerStateManager().firstUnstableOffset());
+
+ reopened.truncateFullyAndStartAt(2L, Optional.of(1L));
+ assertEquals(Optional.empty(), reopened.firstUnstableOffset());
+ assertEquals(Map.of(),
reopened.producerStateManager().activeProducers());
+ assertEquals(1L, reopened.logStartOffset());
+ assertEquals(2L, reopened.logEndOffset());
+ }
+
+ private void testTruncateBelowFirstUnstableOffset(BiConsumer<UnifiedLog,
Long> truncateFunc) throws IOException {
+ // Verify that truncation below the first unstable offset correctly
+ // resets the producer state. Specifically we are testing the case when
+ // the segment position of the first unstable offset is unknown.
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long producerId = 17L;
+ short producerEpoch = 10;
+ int sequence = 0;
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+ Compression.NONE,
+ producerId,
+ producerEpoch,
+ sequence,
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ ), 0);
+ assertEquals(Optional.of(3L), log.firstUnstableOffset());
+
+ // We close and reopen the log to ensure that the first unstable
offset segment
+ // position will be undefined when we truncate the log.
+ log.close();
+
+ UnifiedLog reopened = createLog(logDir, logConfig);
+ assertEquals(Optional.of(new LogOffsetMetadata(3L)),
reopened.producerStateManager().firstUnstableOffset());
+
+ truncateFunc.accept(reopened, 0L);
+ assertEquals(Optional.empty(), reopened.firstUnstableOffset());
+ assertEquals(Map.of(),
reopened.producerStateManager().activeProducers());
+ }
+
+ @Test
+ public void testHighWatermarkMaintenance() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ int leaderEpoch = 0;
+
+ Function<Long, MemoryRecords> records = offset ->
LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, offset, leaderEpoch);
+
+ // High watermark initialized to 0
+ assertHighWatermark(log, 0L);
+
+ // High watermark not changed by append
+ log.appendAsLeader(records.apply(0L), leaderEpoch);
+ assertHighWatermark(log, 0L);
+
+ // Update high watermark as leader
+ log.maybeIncrementHighWatermark(new LogOffsetMetadata(1L));
+ assertHighWatermark(log, 1L);
+
+ // Cannot update past the log end offset
+ log.updateHighWatermark(5L);
+ assertHighWatermark(log, 3L);
+
+ // Update high watermark as follower
+ log.appendAsFollower(records.apply(3L), leaderEpoch);
+ log.updateHighWatermark(6L);
+ assertHighWatermark(log, 6L);
+
+ // High watermark should be adjusted by truncation
+ log.truncateTo(3L);
+ assertHighWatermark(log, 3L);
+
+ log.appendAsLeader(records.apply(0L), 0);
+ assertHighWatermark(log, 3L);
+ assertEquals(6L, log.logEndOffset());
+ assertEquals(0L, log.logStartOffset());
+
+ // Full truncation should also reset high watermark
+ log.truncateFullyAndStartAt(4L, Optional.empty());
+ assertEquals(4L, log.logEndOffset());
+ assertEquals(4L, log.logStartOffset());
+ assertHighWatermark(log, 4L);
+ }
+
+ private void assertHighWatermark(UnifiedLog log, long offset) throws
IOException {
+ assertEquals(offset, log.highWatermark());
+ assertValidLogOffsetMetadata(log,
log.fetchOffsetSnapshot().highWatermark());
+ }
+
+ private void assertNonEmptyFetch(UnifiedLog log, long offset,
FetchIsolation isolation, long batchBaseOffset) throws IOException {
+ FetchDataInfo readInfo = log.read(offset, Integer.MAX_VALUE,
isolation, true);
+
+ assertFalse(readInfo.firstEntryIncomplete);
+ assertTrue(readInfo.records.sizeInBytes() > 0);
+
+ long upperBoundOffset = switch (isolation) {
+ case LOG_END -> log.logEndOffset();
+ case HIGH_WATERMARK -> log.highWatermark();
+ case TXN_COMMITTED -> log.lastStableOffset();
+ };
+
+ for (Record record : readInfo.records.records())
+ assertTrue(record.offset() < upperBoundOffset);
+
+ assertEquals(batchBaseOffset,
readInfo.fetchOffsetMetadata.messageOffset);
+ assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata);
+ }
+
+ private void assertEmptyFetch(UnifiedLog log, long offset, FetchIsolation
isolation, long batchBaseOffset) throws IOException {
+ FetchDataInfo readInfo = log.read(offset, Integer.MAX_VALUE,
isolation, true);
+ assertFalse(readInfo.firstEntryIncomplete);
+ assertEquals(0, readInfo.records.sizeInBytes());
+ assertEquals(batchBaseOffset,
readInfo.fetchOffsetMetadata.messageOffset);
+ assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata);
+ }
+
+ @Test
+ public void testFetchUpToLogEndOffset() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ )), 0);
+ TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 3L, 5L));
+
+ for (long offset = log.logStartOffset(); offset < log.logEndOffset();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END,
batchBaseOffset);
+ }
+ }
+
+ @Test
+ public void testFetchUpToHighWatermark() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ )), 0);
+ TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 3L, 5L));
+
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+
+ log.updateHighWatermark(3L);
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+
+ log.updateHighWatermark(5L);
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+ }
+
+ private void assertHighWatermarkBoundedFetches(UnifiedLog log,
TreeSet<Long> batchBaseOffsets) throws IOException {
+ for (long offset = log.logStartOffset(); offset < log.highWatermark();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset);
+ }
+
+ for (long offset = log.highWatermark(); offset < log.logEndOffset();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset);
+ }
+ }
+
+ @Test
+ public void testActiveProducers() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ // Test transactional producer state (open transaction)
+ short producer1Epoch = 5;
+ long producerId1 = 1L;
+ LogTestUtils.appendTransactionalAsLeader(log, producerId1,
producer1Epoch, mockTime).accept(5);
+ assertProducerState(
+ log,
+ producerId1,
+ producer1Epoch,
+ 4,
+ Optional.of(0L),
+ Optional.empty()
+ );
+
+ // Test transactional producer state (closed transaction)
+ int coordinatorEpoch = 15;
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId1,
producer1Epoch, ControlRecordType.COMMIT,
+ mockTime.milliseconds(), coordinatorEpoch, 0,
TransactionVersion.TV_0.featureLevel());
+ assertProducerState(
+ log,
+ producerId1,
+ producer1Epoch,
+ 4,
+ Optional.empty(),
+ Optional.of(coordinatorEpoch)
+ );
+
+ // Test idempotent producer state
+ short producer2Epoch = 5;
+ long producerId2 = 2L;
+ LogTestUtils.appendIdempotentAsLeader(log, producerId2,
producer2Epoch, mockTime, false).accept(3);
+ assertProducerState(
+ log,
+ producerId2,
+ producer2Epoch,
+ 2,
+ Optional.empty(),
+ Optional.empty()
+ );
+ }
+
+ private void assertProducerState(
+ UnifiedLog log,
+ long producerId,
+ short producerEpoch,
+ int lastSequence,
+ Optional<Long> currentTxnStartOffset,
+ Optional<Integer> coordinatorEpoch
+ ) {
+ Optional<DescribeProducersResponseData.ProducerState> producerStateOpt
= log.activeProducers().stream().filter(p -> p.producerId() ==
producerId).findFirst();
+ assertTrue(producerStateOpt.isPresent());
+
+ DescribeProducersResponseData.ProducerState producerState =
producerStateOpt.get();
+ assertEquals(producerEpoch, producerState.producerEpoch());
+ assertEquals(lastSequence, producerState.lastSequence());
+ assertEquals(currentTxnStartOffset.orElse(-1L),
producerState.currentTxnStartOffset());
+ assertEquals(coordinatorEpoch.orElse(-1),
producerState.coordinatorEpoch());
+ }
+
+ @Test
+ public void testFetchUpToLastStableOffset() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ short epoch = 0;
+
+ long producerId1 = 1L;
+ long producerId2 = 2L;
+
+ Consumer<Integer> appendProducer1 =
LogTestUtils.appendTransactionalAsLeader(log, producerId1, epoch, mockTime);
+ Consumer<Integer> appendProducer2 =
LogTestUtils.appendTransactionalAsLeader(log, producerId2, epoch, mockTime);
+
+ appendProducer1.accept(5);
+ LogTestUtils.appendNonTransactionalAsLeader(log, 3);
+ appendProducer2.accept(2);
+ appendProducer1.accept(4);
+ LogTestUtils.appendNonTransactionalAsLeader(log, 2);
+ appendProducer1.accept(10);
+
+ TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 5L, 8L,
10L, 14L, 16L, 26L, 27L, 28L));
+
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId1, epoch,
ControlRecordType.COMMIT, mockTime.milliseconds(),
+ 0, 0, TransactionVersion.TV_0.featureLevel());
+ assertEquals(0L, log.lastStableOffset());
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(8L, log.lastStableOffset());
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+
+ LogTestUtils.appendEndTxnMarkerAsLeader(log, producerId2, epoch,
ControlRecordType.ABORT, mockTime.milliseconds(),
+ 0, 0, TransactionVersion.TV_0.featureLevel());
+ assertEquals(8L, log.lastStableOffset());
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(log.logEndOffset(), log.lastStableOffset());
+ assertLsoBoundedFetches(log, batchBaseOffsets);
+ }
+
+ private void assertLsoBoundedFetches(UnifiedLog log, TreeSet<Long>
batchBaseOffsets) throws IOException {
+ for (long offset = log.logStartOffset(); offset <
log.lastStableOffset(); offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertNonEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED,
batchBaseOffset);
+ }
+
+ for (long offset = log.lastStableOffset(); offset <
log.logEndOffset(); offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertEmptyFetch(log, offset, FetchIsolation.TXN_COMMITTED,
batchBaseOffset);
+ }
+ }
+
+ /**
+ * Tests for time based log roll. This test appends messages then changes
the time
+ * using the mock clock to force the log to roll and checks the number of
segments.
+ */
+ @Test
+ public void testTimeBasedLogRollDuringAppend() throws IOException {
+ Supplier<MemoryRecords> createRecords = () ->
LogTestUtils.records(List.of(new SimpleRecord("test".getBytes())));
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentMs(ONE_HOUR).build();
+
+ // create a log
+ UnifiedLog log = createLog(logDir, logConfig, 0L, 0L,
brokerTopicStats, mockTime.scheduler, mockTime,
+ new ProducerStateManagerConfig(24 * 60, false), true,
Optional.empty(), false);
+ assertEquals(1, log.numberOfSegments(), "Log begins with a single
empty segment.");
+ // Test the segment rolling behavior when messages do not have a
timestamp.
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(createRecords.get(), 0);
+ assertEquals(1, log.numberOfSegments(), "Log doesn't roll if doing so
creates an empty segment.");
+
+ log.appendAsLeader(createRecords.get(), 0);
+ assertEquals(2, log.numberOfSegments(), "Log rolls on this append
since time has expired.");
+
+ for (int numSegments = 3; numSegments < 5; numSegments++) {
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(createRecords.get(), 0);
+ assertEquals(numSegments, log.numberOfSegments(), "Changing time
beyond rollMs and appending should create a new segment.");
+ }
+
+ // Append a message with timestamp to a segment whose first message do
not have a timestamp.
+ long timestamp = mockTime.milliseconds() + log.config().segmentMs + 1;
+ Supplier<MemoryRecords> recordWithTimestamp = () ->
LogTestUtils.records(List.of(new SimpleRecord(timestamp, "test".getBytes())));
+ log.appendAsLeader(recordWithTimestamp.get(), 0);
+ assertEquals(4, log.numberOfSegments(), "Segment should not have been
rolled out because the log rolling should be based on wall clock.");
+
+ // Test the segment rolling behavior when messages have timestamps.
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(recordWithTimestamp.get(), 0);
+ assertEquals(5, log.numberOfSegments(), "A new segment should have
been rolled out");
+
+ // move the wall clock beyond log rolling time
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(recordWithTimestamp.get(), 0);
+ assertEquals(5, log.numberOfSegments(), "Log should not roll because
the roll should depend on timestamp of the first message.");
+
+ Supplier<MemoryRecords> recordWithExpiredTimestamp = () ->
LogTestUtils.records(List.of(new SimpleRecord(mockTime.milliseconds(),
"test".getBytes())));
+ log.appendAsLeader(recordWithExpiredTimestamp.get(), 0);
+ assertEquals(6, log.numberOfSegments(), "Log should roll because the
timestamp in the message should make the log segment expire.");
+
+ int numSegments = log.numberOfSegments();
+ mockTime.sleep(log.config().segmentMs + 1);
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE), 0);
+ assertEquals(numSegments, log.numberOfSegments(), "Appending an empty
message set should not roll log even if sufficient time has passed.");
+ }
+
+ @Test
+ public void testRollSegmentThatAlreadyExists() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentMs(ONE_HOUR).build();
+ int partitionLeaderEpoch = 0;
+
+ // create a log
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertEquals(1, log.numberOfSegments(), "Log begins with a single
empty segment.");
+
+ // roll active segment with the same base offset of size zero should
recreate the segment
+ log.roll(Optional.of(0L));
+ assertEquals(1, log.numberOfSegments(), "Expect 1 segment after roll()
empty segment with base offset.");
+
+ // should be able to append records to active segment
+ MemoryRecords records = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"k1".getBytes(), "v1".getBytes())),
+ 0L, partitionLeaderEpoch);
+ log.appendAsFollower(records, partitionLeaderEpoch);
+ assertEquals(1, log.numberOfSegments(), "Expect one segment.");
+ assertEquals(0L, log.activeSegment().baseOffset());
+
+ // make sure we can append more records
+ MemoryRecords records2 = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds() + 10,
"k2".getBytes(), "v2".getBytes())),
+ 1L, partitionLeaderEpoch);
+ log.appendAsFollower(records2, partitionLeaderEpoch);
+
+ assertEquals(2, log.logEndOffset(), "Expect two records in the log");
+ assertEquals(0, log.read(0, 1, FetchIsolation.LOG_END,
true).records.batches().iterator().next().lastOffset());
+ assertEquals(1, log.read(1, 1, FetchIsolation.LOG_END,
true).records.batches().iterator().next().lastOffset());
+
+ // roll so that active segment is empty
+ log.roll();
+ assertEquals(2L, log.activeSegment().baseOffset(), "Expect base offset
of active segment to be LEO");
+ assertEquals(2, log.numberOfSegments(), "Expect two segments.");
+
+ // manually resize offset index to force roll of an empty active
segment on next append
+ log.activeSegment().offsetIndex().resize(0);
+ MemoryRecords records3 = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds() + 12,
"k3".getBytes(), "v3".getBytes())),
+ 2L, partitionLeaderEpoch);
+ log.appendAsFollower(records3, partitionLeaderEpoch);
+ assertTrue(log.activeSegment().offsetIndex().maxEntries() > 1);
+ assertEquals(2, log.read(2, 1, FetchIsolation.LOG_END,
true).records.batches().iterator().next().lastOffset());
+ assertEquals(2, log.numberOfSegments(), "Expect two segments.");
+ }
+
+ @Test
+ public void testNonSequentialAppend() throws IOException {
+ // create a log
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+ long pid = 1L;
+ short epoch = 0;
+
+ MemoryRecords records = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
+ pid, epoch, 0, 0L);
+ log.appendAsLeader(records, 0);
+
+ MemoryRecords nextRecords = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
+ pid, epoch, 2, 0L);
+ assertThrows(OutOfOrderSequenceException.class, () ->
log.appendAsLeader(nextRecords, 0));
+ }
+
+ @Test
+ public void testTruncateToEndOffsetClearsEpochCache() throws IOException {
+ UnifiedLog log = createLog(logDir, new LogConfig(new Properties()));
+
+ // Seed some initial data in the log
+ MemoryRecords records = LogTestUtils.records(List.of(new
SimpleRecord("a".getBytes()), new SimpleRecord("b".getBytes())),
+ 27, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ appendAsFollower(log, records, 19);
+ assertEquals(Optional.of(new EpochEntry(19, 27)),
log.leaderEpochCache().latestEntry());
+ assertEquals(29, log.logEndOffset());
+
+ // Truncations greater than or equal to the log end offset should
+ // clear the epoch cache
+ verifyTruncationClearsEpochCache(log, 20, log.logEndOffset());
+ verifyTruncationClearsEpochCache(log, 24, log.logEndOffset() + 1);
+ }
+
+ private void verifyTruncationClearsEpochCache(UnifiedLog log, int epoch,
long truncationOffset) {
+ // Simulate becoming a leader
+ log.assignEpochStartOffset(epoch, log.logEndOffset());
+ assertEquals(Optional.of(new EpochEntry(epoch, 29)),
log.leaderEpochCache().latestEntry());
+ assertEquals(29, log.logEndOffset());
+
+ // Now we become the follower and truncate to an offset greater
+ // than or equal to the log end offset. The trivial epoch entry
+ // at the end of the log should be gone
+ log.truncateTo(truncationOffset);
+ assertEquals(Optional.of(new EpochEntry(19, 27)),
log.leaderEpochCache().latestEntry());
+ assertEquals(29, log.logEndOffset());
+ }
+
+ /**
+ * Test the values returned by the logSegments call
+ */
+ @Test
+ public void testLogSegmentsCallCorrect() throws IOException {
+ // Create 3 segments and make sure we get the right values from
various logSegments calls.
+ Supplier<MemoryRecords> createRecords = () ->
LogTestUtils.records(List.of(new SimpleRecord("test".getBytes())),
mockTime.milliseconds());
+
+ int setSize = createRecords.get().sizeInBytes();
+ int msgPerSeg = 10;
+ int segmentSize = msgPerSeg * setSize; // each segment will be 10
messages
+ // create a log
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(segmentSize).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertEquals(1, log.numberOfSegments(), "There should be exactly 1
segment.");
+
+ // segments expire in size
+ for (int i = 1; i < (2 * msgPerSeg + 2); i++) {
+ log.appendAsLeader(createRecords.get(), 0);
+ }
+ assertEquals(3, log.numberOfSegments(), "There should be exactly 3
segments.");
+
+ // from == to should always be null
+ assertEquals(List.of(), getSegmentOffsets(log, 10, 10));
+ assertEquals(List.of(), getSegmentOffsets(log, 15, 15));
+
+ assertEquals(List.of(0L, 10L, 20L), getSegmentOffsets(log, 0, 21));
+
+ assertEquals(List.of(0L), getSegmentOffsets(log, 1, 5));
+ assertEquals(List.of(10L, 20L), getSegmentOffsets(log, 13, 21));
+ assertEquals(List.of(10L), getSegmentOffsets(log, 13, 17));
+
+ // from > to is bad
+ assertThrows(IllegalArgumentException.class, () -> log.logSegments(10,
0));
+ }
+
+ private List<Long> getSegmentOffsets(UnifiedLog log, long from, long to) {
+ return log.logSegments(from,
to).stream().map(LogSegment::baseOffset).toList();
+ }
+
+ @Test
+ public void testInitializationOfProducerSnapshotsUpgradePath() throws
IOException {
+ // simulate the upgrade path by creating a new log with several
segments, deleting the
+ // snapshot files, and then reloading the log
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(64 * 10).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ assertEquals(OptionalLong.empty(), log.oldestProducerSnapshotOffset());
+
+ for (int i = 0; i < 100; i++) {
+ SimpleRecord record = new SimpleRecord(mockTime.milliseconds(),
String.valueOf(i).getBytes());
+ log.appendAsLeader(LogTestUtils.records(List.of(record)), 0);
+ }
Review Comment:
Similar to my other comment
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -622,6 +642,988 @@ public void
testFirstUnstableOffsetWithTransactionalData() throws IOException {
assertEquals(Optional.empty(), log.firstUnstableOffset());
}
+ @Test
+ public void testHighWatermarkMetadataUpdatedAfterSegmentRoll() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ MemoryRecords records = LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ));
+
+ log.appendAsLeader(records, 0);
+ assertFetchSizeAndOffsets(log, 0L, 0, List.of());
+
+ log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+ assertFetchSizeAndOffsets(log, 0L, records.sizeInBytes(), List.of(0L,
1L, 2L));
+
+ log.roll();
+ assertFetchSizeAndOffsets(log, 0L, records.sizeInBytes(), List.of(0L,
1L, 2L));
+
+ log.appendAsLeader(records, 0);
+ assertFetchSizeAndOffsets(log, 3L, 0, List.of());
+ }
+
+ private void assertFetchSizeAndOffsets(UnifiedLog log, long fetchOffset,
int expectedSize, List<Long> expectedOffsets) throws IOException {
+ FetchDataInfo readInfo = log.read(
+ fetchOffset,
+ 2048,
+ FetchIsolation.HIGH_WATERMARK,
+ false);
+ assertEquals(expectedSize, readInfo.records.sizeInBytes());
+ List<Long> actualOffsets = new ArrayList<>();
+ readInfo.records.records().forEach(record ->
actualOffsets.add(record.offset()));
+ assertEquals(expectedOffsets, actualOffsets);
+ }
+
+ @Test
+ public void testAppendAsLeaderWithRaftLeader() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ int leaderEpoch = 0;
+
+ Function<Long, MemoryRecords> records = offset ->
LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, offset, leaderEpoch);
+
+ log.appendAsLeader(records.apply(0L), leaderEpoch,
AppendOrigin.RAFT_LEADER);
+ assertEquals(0, log.logStartOffset());
+ assertEquals(3L, log.logEndOffset());
+
+ // Since raft leader is responsible for assigning offsets, and the
LogValidator is bypassed from the performance perspective,
+ // so the first offset of the MemoryRecords to be appended should
equal to the next offset in the log
+ assertThrows(UnexpectedAppendOffsetException.class, () ->
log.appendAsLeader(records.apply(1L), leaderEpoch, AppendOrigin.RAFT_LEADER));
+
+ // When the first offset of the MemoryRecords to be appended equals to
the next offset in the log, append will succeed
+ log.appendAsLeader(records.apply(3L), leaderEpoch,
AppendOrigin.RAFT_LEADER);
+ assertEquals(6, log.logEndOffset());
+ }
+
+ @Test
+ public void testAppendInfoFirstOffset() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ List<SimpleRecord> simpleRecords = List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ );
+
+ MemoryRecords records = LogTestUtils.records(simpleRecords);
+
+ LogAppendInfo firstAppendInfo = log.appendAsLeader(records, 0);
+ assertEquals(0, firstAppendInfo.firstOffset());
+
+ LogAppendInfo secondAppendInfo = log.appendAsLeader(
+ LogTestUtils.records(simpleRecords),
+ 0
+ );
+ assertEquals(simpleRecords.size(), secondAppendInfo.firstOffset());
+
+ log.roll();
+ LogAppendInfo afterRollAppendInfo =
log.appendAsLeader(LogTestUtils.records(simpleRecords), 0);
+ assertEquals(simpleRecords.size() * 2,
afterRollAppendInfo.firstOffset());
+ }
+
+ @Test
+ public void testTruncateBelowFirstUnstableOffset() throws IOException {
+ testTruncateBelowFirstUnstableOffset(UnifiedLog::truncateTo);
+ }
+
+ @Test
+ public void testTruncateFullyAndStartBelowFirstUnstableOffset() throws
IOException {
+ testTruncateBelowFirstUnstableOffset((log, targetOffset) ->
log.truncateFullyAndStartAt(targetOffset, Optional.empty()));
+ }
+
+ @Test
+ public void testTruncateFullyAndStart() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long producerId = 17L;
+ short producerEpoch = 10;
+ int sequence = 0;
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+ Compression.NONE,
+ producerId,
+ producerEpoch,
+ sequence,
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ ), 0);
+ assertEquals(Optional.of(3L), log.firstUnstableOffset());
+
+ // We close and reopen the log to ensure that the first unstable
offset segment
+ // position will be undefined when we truncate the log.
+ log.close();
+
+ UnifiedLog reopened = createLog(logDir, logConfig);
+ assertEquals(Optional.of(new LogOffsetMetadata(3L)),
reopened.producerStateManager().firstUnstableOffset());
+
+ reopened.truncateFullyAndStartAt(2L, Optional.of(1L));
+ assertEquals(Optional.empty(), reopened.firstUnstableOffset());
+ assertEquals(Map.of(),
reopened.producerStateManager().activeProducers());
+ assertEquals(1L, reopened.logStartOffset());
+ assertEquals(2L, reopened.logEndOffset());
+ }
+
+ private void testTruncateBelowFirstUnstableOffset(BiConsumer<UnifiedLog,
Long> truncateFunc) throws IOException {
+ // Verify that truncation below the first unstable offset correctly
+ // resets the producer state. Specifically we are testing the case when
+ // the segment position of the first unstable offset is unknown.
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long producerId = 17L;
+ short producerEpoch = 10;
+ int sequence = 0;
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(MemoryRecords.withTransactionalRecords(
+ Compression.NONE,
+ producerId,
+ producerEpoch,
+ sequence,
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ ), 0);
+ assertEquals(Optional.of(3L), log.firstUnstableOffset());
+
+ // We close and reopen the log to ensure that the first unstable
offset segment
+ // position will be undefined when we truncate the log.
+ log.close();
+
+ UnifiedLog reopened = createLog(logDir, logConfig);
+ assertEquals(Optional.of(new LogOffsetMetadata(3L)),
reopened.producerStateManager().firstUnstableOffset());
+
+ truncateFunc.accept(reopened, 0L);
+ assertEquals(Optional.empty(), reopened.firstUnstableOffset());
+ assertEquals(Map.of(),
reopened.producerStateManager().activeProducers());
+ }
+
+ @Test
+ public void testHighWatermarkMaintenance() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ int leaderEpoch = 0;
+
+ Function<Long, MemoryRecords> records = offset ->
LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), "a".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "b".getBytes(),
"value".getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), "c".getBytes(),
"value".getBytes())
+ ), RecordBatch.CURRENT_MAGIC_VALUE, Compression.NONE,
RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, offset, leaderEpoch);
+
+ // High watermark initialized to 0
+ assertHighWatermark(log, 0L);
+
+ // High watermark not changed by append
+ log.appendAsLeader(records.apply(0L), leaderEpoch);
+ assertHighWatermark(log, 0L);
+
+ // Update high watermark as leader
+ log.maybeIncrementHighWatermark(new LogOffsetMetadata(1L));
+ assertHighWatermark(log, 1L);
+
+ // Cannot update past the log end offset
+ log.updateHighWatermark(5L);
+ assertHighWatermark(log, 3L);
+
+ // Update high watermark as follower
+ log.appendAsFollower(records.apply(3L), leaderEpoch);
+ log.updateHighWatermark(6L);
+ assertHighWatermark(log, 6L);
+
+ // High watermark should be adjusted by truncation
+ log.truncateTo(3L);
+ assertHighWatermark(log, 3L);
+
+ log.appendAsLeader(records.apply(0L), 0);
+ assertHighWatermark(log, 3L);
+ assertEquals(6L, log.logEndOffset());
+ assertEquals(0L, log.logStartOffset());
+
+ // Full truncation should also reset high watermark
+ log.truncateFullyAndStartAt(4L, Optional.empty());
+ assertEquals(4L, log.logEndOffset());
+ assertEquals(4L, log.logStartOffset());
+ assertHighWatermark(log, 4L);
+ }
+
+ private void assertHighWatermark(UnifiedLog log, long offset) throws
IOException {
+ assertEquals(offset, log.highWatermark());
+ assertValidLogOffsetMetadata(log,
log.fetchOffsetSnapshot().highWatermark());
+ }
+
+ private void assertNonEmptyFetch(UnifiedLog log, long offset,
FetchIsolation isolation, long batchBaseOffset) throws IOException {
+ FetchDataInfo readInfo = log.read(offset, Integer.MAX_VALUE,
isolation, true);
+
+ assertFalse(readInfo.firstEntryIncomplete);
+ assertTrue(readInfo.records.sizeInBytes() > 0);
+
+ long upperBoundOffset = switch (isolation) {
+ case LOG_END -> log.logEndOffset();
+ case HIGH_WATERMARK -> log.highWatermark();
+ case TXN_COMMITTED -> log.lastStableOffset();
+ };
+
+ for (Record record : readInfo.records.records())
+ assertTrue(record.offset() < upperBoundOffset);
+
+ assertEquals(batchBaseOffset,
readInfo.fetchOffsetMetadata.messageOffset);
+ assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata);
+ }
+
+ private void assertEmptyFetch(UnifiedLog log, long offset, FetchIsolation
isolation, long batchBaseOffset) throws IOException {
+ FetchDataInfo readInfo = log.read(offset, Integer.MAX_VALUE,
isolation, true);
+ assertFalse(readInfo.firstEntryIncomplete);
+ assertEquals(0, readInfo.records.sizeInBytes());
+ assertEquals(batchBaseOffset,
readInfo.fetchOffsetMetadata.messageOffset);
+ assertValidLogOffsetMetadata(log, readInfo.fetchOffsetMetadata);
+ }
+
+ @Test
+ public void testFetchUpToLogEndOffset() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ )), 0);
+ TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 3L, 5L));
+
+ for (long offset = log.logStartOffset(); offset < log.logEndOffset();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertNonEmptyFetch(log, offset, FetchIsolation.LOG_END,
batchBaseOffset);
+ }
+ }
+
+ @Test
+ public void testFetchUpToHighWatermark() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(ONE_MB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("0".getBytes()),
+ new SimpleRecord("1".getBytes()),
+ new SimpleRecord("2".getBytes())
+ )), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(
+ new SimpleRecord("3".getBytes()),
+ new SimpleRecord("4".getBytes())
+ )), 0);
+ TreeSet<Long> batchBaseOffsets = new TreeSet<>(List.of(0L, 3L, 5L));
+
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+
+ log.updateHighWatermark(3L);
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+
+ log.updateHighWatermark(5L);
+ assertHighWatermarkBoundedFetches(log, batchBaseOffsets);
+ }
+
+ private void assertHighWatermarkBoundedFetches(UnifiedLog log,
TreeSet<Long> batchBaseOffsets) throws IOException {
+ for (long offset = log.logStartOffset(); offset < log.highWatermark();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertNonEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset);
+ }
+
+ for (long offset = log.highWatermark(); offset < log.logEndOffset();
offset++) {
+ Long batchBaseOffset = batchBaseOffsets.floor(offset);
+ assertNotNull(batchBaseOffset);
+ assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset);
Review Comment:
In the Scala code the end bound of this loop is included while here it is
excluded
```
(log.highWatermark to log.logEndOffset).foreach { offset =>
val batchBaseOffset = batchBaseOffsets.rangeTo(offset).lastKey
assertEmptyFetch(log, offset, FetchIsolation.HIGH_WATERMARK,
batchBaseOffset)
}
```
I did a quick comparison and we do enter this loop 10 times in the Scala
test, but 7 in the Java test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]