mimaison commented on code in PR #21686:
URL: https://github.com/apache/kafka/pull/21686#discussion_r2910326998
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -1913,6 +1943,777 @@ public void
testRetentionSizeInPercentMetricUpdatedOnDeletionError() throws IOEx
assertEquals(200, yammerMetricValue(metricName),
"Metric should be updated in finally block even when exception
occurs");
}
+
+ @Test
+ public void testReadWithMinMessage() throws IOException {
+ var logConfig = new LogTestUtils.LogConfigBuilder()
Review Comment:
We don't use `var` anywhere else in this file or even module. So maybe keep
the types for consistency.
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -1913,6 +1943,777 @@ public void
testRetentionSizeInPercentMetricUpdatedOnDeletionError() throws IOEx
assertEquals(200, yammerMetricValue(metricName),
"Metric should be updated in finally block even when exception
occurs");
}
+
+ @Test
+ public void testReadWithMinMessage() throws IOException {
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(72)
+ .build();
+ log = createLog(logDir, logConfig);
+ var messageIds = IntStream.concat(
+ IntStream.range(0, 50),
+ IntStream.iterate(50, i -> i < 200, i -> i + 7)
+ ).toArray();
+ var records = Arrays.stream(messageIds)
+ .mapToObj(id -> new
SimpleRecord(String.valueOf(id).getBytes()))
+ .toArray(SimpleRecord[]::new);
+
+ // now test the case that we give the offsets and use non-sequential
offsets
+ for (var i = 0; i < records.length; i++) {
+ log.appendAsFollower(
+ MemoryRecords.withRecords(messageIds[i], Compression.NONE,
0, records[i]),
+ Integer.MAX_VALUE
+ );
+ }
+
+ var maxMessageId = Arrays.stream(messageIds).max().getAsInt();
+ for (var i = 50; i < maxMessageId; i++) {
+ var offset = i;
+ var idx = IntStream.range(0, messageIds.length)
+ .filter(j -> messageIds[j] >= offset)
+ .findFirst()
+ .getAsInt();
+
+ var fetchResults = List.of(
+ log.read(i, 1, FetchIsolation.LOG_END, true),
+ log.read(i, 100000, FetchIsolation.LOG_END, true),
+ log.read(i, 100, FetchIsolation.LOG_END, true)
+ );
+ for (var fetchDataInfo : fetchResults) {
+ var read = fetchDataInfo.records.records().iterator().next();
+ assertEquals(messageIds[idx], read.offset(), "Offset read
should match message id.");
+ assertEquals(records[idx], new SimpleRecord(read), "Message
should match appended.");
+ }
+ }
+ }
+
+ @Test
+ public void testReadWithTooSmallMaxLength() throws IOException {
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(72)
+ .build();
+ log = createLog(logDir, logConfig);
+ var messageIds = IntStream.concat(
+ IntStream.range(0, 50),
+ IntStream.iterate(50, i -> i < 200, i -> i + 7)
+ ).toArray();
+ var records = Arrays.stream(messageIds)
+ .mapToObj(id -> new
SimpleRecord(String.valueOf(id).getBytes()))
+ .toArray(SimpleRecord[]::new);
+
+ // now test the case that we give the offsets and use non-sequential
offsets
+ for (var i = 0; i < records.length; i++) {
+ log.appendAsFollower(
+ MemoryRecords.withRecords(messageIds[i], Compression.NONE,
0, records[i]),
+ Integer.MAX_VALUE
+ );
+ }
+
+ var maxMessageId = Arrays.stream(messageIds).max().getAsInt();
+ for (var i = 50; i < maxMessageId; i++) {
+ assertEquals(MemoryRecords.EMPTY, log.read(i, 0,
FetchIsolation.LOG_END, false).records);
+
+ // we return an incomplete message instead of an empty one for the
case below
+ // we use this mechanism to tell consumers of the fetch request
version 2 and below that the message size is
+ // larger than the fetch size
+ // in fetch request version 3, we no longer need this as we return
oversized messages from the first non-empty
+ // partition
+ var fetchInfo = log.read(i, 1, FetchIsolation.LOG_END, false);
+ assertTrue(fetchInfo.firstEntryIncomplete);
+ assertInstanceOf(FileRecords.class, fetchInfo.records);
+ assertEquals(1, fetchInfo.records.sizeInBytes());
+ }
+ }
+
+ /**
+ * Test reading at the boundary of the log, specifically
+ * - reading from the logEndOffset should give an empty message set
+ * - reading from the maxOffset should give an empty message set
+ * - reading beyond the log end offset should throw an
OffsetOutOfRangeException
+ */
+ @Test
+ public void testReadOutOfRange() throws IOException {
+ // create empty log files to simulate a log starting at offset 1024
+ Files.createFile(LogFileUtils.logFile(logDir, 1024).toPath());
+ Files.createFile(LogFileUtils.offsetIndexFile(logDir, 1024).toPath());
+
+ // set up replica log starting with offset 1024 and with one message
(at offset 1024)
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1024)
+ .build();
+ log = createLog(logDir, logConfig);
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord("42".getBytes())), 0);
+
+ assertEquals(
+ 0,
+ log.read(1025, 1000, FetchIsolation.LOG_END,
true).records.sizeInBytes(),
+ "Reading at the log end offset should produce 0 byte read."
+ );
+
+ assertThrows(OffsetOutOfRangeException.class, () -> log.read(0, 1000,
FetchIsolation.LOG_END, true));
+ assertThrows(OffsetOutOfRangeException.class, () -> log.read(1026,
1000, FetchIsolation.LOG_END, true));
+ }
+
+ @Test
+ public void testFlushingEmptyActiveSegments() throws IOException {
+ log = createLog(logDir, new LogConfig(new Properties()));
+ var message = MemoryRecords.withRecords(
+ Compression.NONE,
+ new SimpleRecord(mockTime.milliseconds(), null,
"Test".getBytes())
+ );
+
+ log.appendAsLeader(message, 0);
+ log.roll();
+ assertEquals(2, logDir.listFiles(f ->
f.getName().endsWith(".log")).length);
+ assertEquals(1, logDir.listFiles(f ->
f.getName().endsWith(".index")).length);
+ assertEquals(0, log.activeSegment().size());
+ log.flush(true);
+ assertEquals(2, logDir.listFiles(f ->
f.getName().endsWith(".log")).length);
+ assertEquals(2, logDir.listFiles(f ->
f.getName().endsWith(".index")).length);
+ }
+
+ /**
+ * Test that covers reads and writes on a multisegment log. This test
appends a bunch of messages
+ * and then reads them all back and checks that the message read and
offset matches what was appended.
+ */
+ @Test
+ public void testLogRolls() throws IOException, InterruptedException {
+ // create a multipart log with 100 messages
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(100)
+ .build();
+ log = createLog(logDir, logConfig);
+ var numMessages = 100;
+ var messageSets = IntStream.range(0, numMessages)
+ .mapToObj(i -> MemoryRecords.withRecords(
+ Compression.NONE,
+ new SimpleRecord(mockTime.milliseconds(), null,
String.valueOf(i).getBytes()))
+ ).toArray(MemoryRecords[]::new);
+ for (var messageSet : messageSets) {
+ log.appendAsLeader(messageSet, 0);
+ }
+ log.flush(false);
+
+ // do successive reads to ensure all our messages are there
+ var offset = 0L;
+ for (var i = 0; i < numMessages; i++) {
+ var batches = log.read(offset, 1024 * 1024,
FetchIsolation.LOG_END, true).records.batches();
+ var head = batches.iterator().next();
+ assertEquals(offset, head.lastOffset(), "Offsets not equal");
+
+ var expected = messageSets[i].records().iterator().next();
+ var actual = head.iterator().next();
+ assertEquals(expected.key(), actual.key(), "Keys not equal at
offset " + offset);
+ assertEquals(expected.value(), actual.value(), "Values not equal
at offset " + offset);
+ assertEquals(expected.timestamp(), actual.timestamp(), "Timestamps
not equal at offset " + offset);
+ offset = head.lastOffset() + 1;
+ }
+ var lastRead = log.read(numMessages, 1024 * 1024,
FetchIsolation.LOG_END, true).records;
+ assertFalse(lastRead.records().iterator().hasNext(), "Should be no
more messages");
+
+ // check that rolling the log forced a flush, the flush is async so
retry in case of failure
+ TestUtils.retryOnExceptionWithTimeout(1000L, () ->
+ assertTrue(log.recoveryPoint() >=
log.activeSegment().baseOffset(), "Log roll should have forced flush")
+ );
+ }
+
+ /**
+ * Test reads at offsets that fall within compressed message set
boundaries.
+ */
+ @Test
+ public void testCompressedMessages() throws IOException {
+ // this log should roll after every message set
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(110)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ // append 2 compressed message sets, each with two messages giving
offsets 0, 1, 2, 3
+
log.appendAsLeader(MemoryRecords.withRecords(Compression.gzip().build(),
+ new SimpleRecord("hello".getBytes()), new
SimpleRecord("there".getBytes())), 0);
+
log.appendAsLeader(MemoryRecords.withRecords(Compression.gzip().build(),
+ new SimpleRecord("alpha".getBytes()), new
SimpleRecord("beta".getBytes())), 0);
+
+ // we should always get the first message in the compressed set when
reading any offset in the set
+ assertEquals(0, read(log, 0).iterator().next().offset(), "Read at
offset 0 should produce 0");
+ assertEquals(0, read(log, 1).iterator().next().offset(), "Read at
offset 1 should produce 0");
+ assertEquals(2, read(log, 2).iterator().next().offset(), "Read at
offset 2 should produce 2");
+ assertEquals(2, read(log, 3).iterator().next().offset(), "Read at
offset 3 should produce 2");
+ }
+
+ private Iterable<Record> read(UnifiedLog log, long offset) throws
IOException {
+ return log.read(offset, 4096, FetchIsolation.LOG_END,
true).records.records();
+ }
+
+ /**
+ * Test garbage collecting old segments
+ */
+ @Test
+ public void testThatGarbageCollectingSegmentsDoesntChangeOffset() throws
IOException {
+ for (int messagesToAppend : List.of(0, 1, 25)) {
+ logDir.mkdirs();
+ // first test a log segment starting at 0
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(100)
+ .retentionMs(0)
+ .build();
+ var testLog = createLog(logDir, logConfig);
+ for (int i = 0; i < messagesToAppend; i++) {
+
testLog.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord(mockTime.milliseconds() - 10, null,
String.valueOf(i).getBytes())), 0);
+ }
+
+ var currOffset = testLog.logEndOffset();
+ assertEquals(currOffset, messagesToAppend);
+
+ // time goes by; the log file is deleted
+ testLog.updateHighWatermark(currOffset);
+ testLog.deleteOldSegments();
+
+ assertEquals(currOffset, testLog.logEndOffset(), "Deleting
segments shouldn't have changed the logEndOffset");
+ assertEquals(1, testLog.numberOfSegments(), "We should still have
one segment left");
+ assertEquals(0, testLog.deleteOldSegments(), "Further collection
shouldn't delete anything");
+ assertEquals(currOffset, testLog.logEndOffset(), "Still no change
in the logEndOffset");
+ assertEquals(currOffset,
+
testLog.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord(mockTime.milliseconds(), null,
"hello".getBytes())), 0).firstOffset(),
+ "Should still be able to append and should get the
logEndOffset assigned to the new append");
+
+ // cleanup the log
+ logsToClose.remove(testLog);
+ testLog.delete();
+ }
+ }
+
+ /**
+ * MessageSet size shouldn't exceed the config.segmentSize, check that it
is properly enforced by
+ * appending a message set larger than the config.segmentSize setting and
checking that an exception is thrown.
+ */
+ @Test
+ public void testMessageSetSizeCheck() throws IOException {
+ var messageSet = MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord("You".getBytes()), new
SimpleRecord("bethe".getBytes()));
+ // append messages to log
+ var configSegmentSize = messageSet.sizeInBytes() - 1;
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(configSegmentSize)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ assertThrows(RecordBatchTooLargeException.class, () ->
log.appendAsLeader(messageSet, 0));
+ }
+
+ @Test
+ public void testCompactedTopicConstraints() throws IOException {
+ var keyedMessage = new SimpleRecord("and here it is".getBytes(), "this
message has a key".getBytes());
+ var anotherKeyedMessage = new SimpleRecord("another key".getBytes(),
"this message also has a key".getBytes());
+ var unkeyedMessage = new SimpleRecord("this message does not have a
key".getBytes());
+
+ var messageSetWithUnkeyedMessage =
MemoryRecords.withRecords(Compression.NONE, unkeyedMessage, keyedMessage);
+ var messageSetWithOneUnkeyedMessage =
MemoryRecords.withRecords(Compression.NONE, unkeyedMessage);
+ var messageSetWithCompressedKeyedMessage =
MemoryRecords.withRecords(Compression.gzip().build(), keyedMessage);
+ var messageSetWithCompressedUnkeyedMessage =
MemoryRecords.withRecords(Compression.gzip().build(), keyedMessage,
unkeyedMessage);
+ var messageSetWithKeyedMessage =
MemoryRecords.withRecords(Compression.NONE, keyedMessage);
+ var messageSetWithKeyedMessages =
MemoryRecords.withRecords(Compression.NONE, keyedMessage, anotherKeyedMessage);
+
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .cleanupPolicy(TopicConfig.CLEANUP_POLICY_COMPACT)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ var errorMsgPrefix = "Compacted topic cannot accept message without
key";
+
+ var e = assertThrows(RecordValidationException.class,
+ () -> log.appendAsLeader(messageSetWithUnkeyedMessage, 0));
+ assertInstanceOf(InvalidRecordException.class, e.invalidException());
+ assertEquals(1, e.recordErrors().size());
+ assertEquals(0, e.recordErrors().get(0).batchIndex);
+ assertTrue(e.recordErrors().get(0).message.startsWith(errorMsgPrefix));
+
+ e = assertThrows(RecordValidationException.class,
+ () -> log.appendAsLeader(messageSetWithOneUnkeyedMessage, 0));
+ assertInstanceOf(InvalidRecordException.class, e.invalidException());
+ assertEquals(1, e.recordErrors().size());
+ assertEquals(0, e.recordErrors().get(0).batchIndex);
+ assertTrue(e.recordErrors().get(0).message.startsWith(errorMsgPrefix));
+
+ e = assertThrows(RecordValidationException.class,
+ () ->
log.appendAsLeader(messageSetWithCompressedUnkeyedMessage, 0));
+ assertInstanceOf(InvalidRecordException.class, e.invalidException());
+ assertEquals(1, e.recordErrors().size());
+ assertEquals(1, e.recordErrors().get(0).batchIndex); // batch index
is 1
+ assertTrue(e.recordErrors().get(0).message.startsWith(errorMsgPrefix));
+
+ // check if metric for NoKeyCompactedTopicRecordsPerSec is logged
+ assertEquals(1,
KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().stream()
+ .filter(k ->
k.getMBeanName().endsWith(BrokerTopicMetrics.NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC))
+ .count());
+
assertTrue(meterCount(BrokerTopicMetrics.NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC)
> 0);
+
+ // the following should succeed without any InvalidMessageException
+ log.appendAsLeader(messageSetWithKeyedMessage, 0);
+ log.appendAsLeader(messageSetWithKeyedMessages, 0);
+ log.appendAsLeader(messageSetWithCompressedKeyedMessage, 0);
+ }
+
+ /**
+ * We have a max size limit on message appends, check that it is properly
enforced by appending a message larger than the
+ * setting and checking that an exception is thrown.
+ */
+ @Test
+ public void testMessageSizeCheck() throws IOException {
+ var first = MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord("You".getBytes()), new
SimpleRecord("bethe".getBytes()));
+ var second = MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord("change (I need more bytes)... blah blah
blah.".getBytes()),
+ new SimpleRecord("More padding boo hoo".getBytes()));
+
+ // append messages to log
+ var maxMessageSize = second.sizeInBytes() - 1;
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .maxMessageBytes(maxMessageSize)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ // should be able to append the small message
+ log.appendAsLeader(first, 0);
+
+ assertThrows(
+ RecordTooLargeException.class,
+ () -> log.appendAsLeader(second, 0),
+ "Second message set should throw MessageSizeTooLargeException."
+ );
+ }
+
+ @Test
+ public void testMessageSizeCheckInAppendAsFollower() throws IOException {
+ var first = MemoryRecords.withRecords(0, Compression.NONE, 0,
+ new SimpleRecord("You".getBytes()), new
SimpleRecord("bethe".getBytes()));
+ var second = MemoryRecords.withRecords(5, Compression.NONE, 0,
+ new SimpleRecord("change (I need more bytes)... blah blah
blah.".getBytes()),
+ new SimpleRecord("More padding boo hoo".getBytes()));
+
+ log = createLog(logDir, new LogTestUtils.LogConfigBuilder()
+ .maxMessageBytes(second.sizeInBytes() - 1)
+ .build());
+
+ log.appendAsFollower(first, Integer.MAX_VALUE);
+ // the second record is larger than limit but appendAsFollower does
not validate the size.
+ log.appendAsFollower(second, Integer.MAX_VALUE);
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(InvalidMemoryRecordsProvider.class)
+ public void testInvalidMemoryRecords(MemoryRecords records,
Optional<Class<Exception>> expectedException) throws IOException {
+ log = createLog(logDir, new LogConfig(new Properties()));
+ var previousEndOffset = log.logEndOffsetMetadata().messageOffset;
+
+ if (expectedException.isPresent()) {
+ assertThrows(expectedException.get(), () ->
log.appendAsFollower(records, Integer.MAX_VALUE));
+ } else {
+ log.appendAsFollower(records, Integer.MAX_VALUE);
+ }
+
+ assertEquals(previousEndOffset,
log.logEndOffsetMetadata().messageOffset);
+ }
+
+ @Test
+ public void testRandomRecords() throws IOException {
+ var random = new java.util.Random();
Review Comment:
We can import Random
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -1913,6 +1943,777 @@ public void
testRetentionSizeInPercentMetricUpdatedOnDeletionError() throws IOEx
assertEquals(200, yammerMetricValue(metricName),
"Metric should be updated in finally block even when exception
occurs");
}
+
+ @Test
+ public void testReadWithMinMessage() throws IOException {
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(72)
+ .build();
+ log = createLog(logDir, logConfig);
+ var messageIds = IntStream.concat(
+ IntStream.range(0, 50),
+ IntStream.iterate(50, i -> i < 200, i -> i + 7)
+ ).toArray();
+ var records = Arrays.stream(messageIds)
+ .mapToObj(id -> new
SimpleRecord(String.valueOf(id).getBytes()))
+ .toArray(SimpleRecord[]::new);
+
+ // now test the case that we give the offsets and use non-sequential
offsets
+ for (var i = 0; i < records.length; i++) {
+ log.appendAsFollower(
+ MemoryRecords.withRecords(messageIds[i], Compression.NONE,
0, records[i]),
+ Integer.MAX_VALUE
+ );
+ }
+
+ var maxMessageId = Arrays.stream(messageIds).max().getAsInt();
+ for (var i = 50; i < maxMessageId; i++) {
+ var offset = i;
+ var idx = IntStream.range(0, messageIds.length)
+ .filter(j -> messageIds[j] >= offset)
+ .findFirst()
+ .getAsInt();
+
+ var fetchResults = List.of(
+ log.read(i, 1, FetchIsolation.LOG_END, true),
+ log.read(i, 100000, FetchIsolation.LOG_END, true),
+ log.read(i, 100, FetchIsolation.LOG_END, true)
+ );
+ for (var fetchDataInfo : fetchResults) {
+ var read = fetchDataInfo.records.records().iterator().next();
+ assertEquals(messageIds[idx], read.offset(), "Offset read
should match message id.");
+ assertEquals(records[idx], new SimpleRecord(read), "Message
should match appended.");
+ }
+ }
+ }
+
+ @Test
+ public void testReadWithTooSmallMaxLength() throws IOException {
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(72)
+ .build();
+ log = createLog(logDir, logConfig);
+ var messageIds = IntStream.concat(
+ IntStream.range(0, 50),
+ IntStream.iterate(50, i -> i < 200, i -> i + 7)
+ ).toArray();
+ var records = Arrays.stream(messageIds)
+ .mapToObj(id -> new
SimpleRecord(String.valueOf(id).getBytes()))
+ .toArray(SimpleRecord[]::new);
+
+ // now test the case that we give the offsets and use non-sequential
offsets
+ for (var i = 0; i < records.length; i++) {
+ log.appendAsFollower(
+ MemoryRecords.withRecords(messageIds[i], Compression.NONE,
0, records[i]),
+ Integer.MAX_VALUE
+ );
+ }
+
+ var maxMessageId = Arrays.stream(messageIds).max().getAsInt();
+ for (var i = 50; i < maxMessageId; i++) {
+ assertEquals(MemoryRecords.EMPTY, log.read(i, 0,
FetchIsolation.LOG_END, false).records);
+
+ // we return an incomplete message instead of an empty one for the
case below
+ // we use this mechanism to tell consumers of the fetch request
version 2 and below that the message size is
+ // larger than the fetch size
+ // in fetch request version 3, we no longer need this as we return
oversized messages from the first non-empty
+ // partition
+ var fetchInfo = log.read(i, 1, FetchIsolation.LOG_END, false);
+ assertTrue(fetchInfo.firstEntryIncomplete);
+ assertInstanceOf(FileRecords.class, fetchInfo.records);
+ assertEquals(1, fetchInfo.records.sizeInBytes());
+ }
+ }
+
+ /**
+ * Test reading at the boundary of the log, specifically
+ * - reading from the logEndOffset should give an empty message set
+ * - reading from the maxOffset should give an empty message set
+ * - reading beyond the log end offset should throw an
OffsetOutOfRangeException
+ */
+ @Test
+ public void testReadOutOfRange() throws IOException {
+ // create empty log files to simulate a log starting at offset 1024
+ Files.createFile(LogFileUtils.logFile(logDir, 1024).toPath());
+ Files.createFile(LogFileUtils.offsetIndexFile(logDir, 1024).toPath());
+
+ // set up replica log starting with offset 1024 and with one message
(at offset 1024)
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(1024)
+ .build();
+ log = createLog(logDir, logConfig);
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord("42".getBytes())), 0);
+
+ assertEquals(
+ 0,
+ log.read(1025, 1000, FetchIsolation.LOG_END,
true).records.sizeInBytes(),
+ "Reading at the log end offset should produce 0 byte read."
+ );
+
+ assertThrows(OffsetOutOfRangeException.class, () -> log.read(0, 1000,
FetchIsolation.LOG_END, true));
+ assertThrows(OffsetOutOfRangeException.class, () -> log.read(1026,
1000, FetchIsolation.LOG_END, true));
+ }
+
+ @Test
+ public void testFlushingEmptyActiveSegments() throws IOException {
+ log = createLog(logDir, new LogConfig(new Properties()));
+ var message = MemoryRecords.withRecords(
+ Compression.NONE,
+ new SimpleRecord(mockTime.milliseconds(), null,
"Test".getBytes())
+ );
+
+ log.appendAsLeader(message, 0);
+ log.roll();
+ assertEquals(2, logDir.listFiles(f ->
f.getName().endsWith(".log")).length);
+ assertEquals(1, logDir.listFiles(f ->
f.getName().endsWith(".index")).length);
+ assertEquals(0, log.activeSegment().size());
+ log.flush(true);
+ assertEquals(2, logDir.listFiles(f ->
f.getName().endsWith(".log")).length);
+ assertEquals(2, logDir.listFiles(f ->
f.getName().endsWith(".index")).length);
+ }
+
+ /**
+ * Test that covers reads and writes on a multisegment log. This test
appends a bunch of messages
+ * and then reads them all back and checks that the message read and
offset matches what was appended.
+ */
+ @Test
+ public void testLogRolls() throws IOException, InterruptedException {
+ // create a multipart log with 100 messages
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(100)
+ .build();
+ log = createLog(logDir, logConfig);
+ var numMessages = 100;
+ var messageSets = IntStream.range(0, numMessages)
+ .mapToObj(i -> MemoryRecords.withRecords(
+ Compression.NONE,
+ new SimpleRecord(mockTime.milliseconds(), null,
String.valueOf(i).getBytes()))
+ ).toArray(MemoryRecords[]::new);
+ for (var messageSet : messageSets) {
+ log.appendAsLeader(messageSet, 0);
+ }
+ log.flush(false);
+
+ // do successive reads to ensure all our messages are there
+ var offset = 0L;
+ for (var i = 0; i < numMessages; i++) {
+ var batches = log.read(offset, 1024 * 1024,
FetchIsolation.LOG_END, true).records.batches();
+ var head = batches.iterator().next();
+ assertEquals(offset, head.lastOffset(), "Offsets not equal");
+
+ var expected = messageSets[i].records().iterator().next();
+ var actual = head.iterator().next();
+ assertEquals(expected.key(), actual.key(), "Keys not equal at
offset " + offset);
+ assertEquals(expected.value(), actual.value(), "Values not equal
at offset " + offset);
+ assertEquals(expected.timestamp(), actual.timestamp(), "Timestamps
not equal at offset " + offset);
+ offset = head.lastOffset() + 1;
+ }
+ var lastRead = log.read(numMessages, 1024 * 1024,
FetchIsolation.LOG_END, true).records;
+ assertFalse(lastRead.records().iterator().hasNext(), "Should be no
more messages");
+
+ // check that rolling the log forced a flush, the flush is async so
retry in case of failure
+ TestUtils.retryOnExceptionWithTimeout(1000L, () ->
+ assertTrue(log.recoveryPoint() >=
log.activeSegment().baseOffset(), "Log roll should have forced flush")
+ );
+ }
+
+ /**
+ * Test reads at offsets that fall within compressed message set
boundaries.
+ */
+ @Test
+ public void testCompressedMessages() throws IOException {
+ // this log should roll after every message set
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(110)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ // append 2 compressed message sets, each with two messages giving
offsets 0, 1, 2, 3
+
log.appendAsLeader(MemoryRecords.withRecords(Compression.gzip().build(),
+ new SimpleRecord("hello".getBytes()), new
SimpleRecord("there".getBytes())), 0);
+
log.appendAsLeader(MemoryRecords.withRecords(Compression.gzip().build(),
+ new SimpleRecord("alpha".getBytes()), new
SimpleRecord("beta".getBytes())), 0);
+
+ // we should always get the first message in the compressed set when
reading any offset in the set
+ assertEquals(0, read(log, 0).iterator().next().offset(), "Read at
offset 0 should produce 0");
+ assertEquals(0, read(log, 1).iterator().next().offset(), "Read at
offset 1 should produce 0");
+ assertEquals(2, read(log, 2).iterator().next().offset(), "Read at
offset 2 should produce 2");
+ assertEquals(2, read(log, 3).iterator().next().offset(), "Read at
offset 3 should produce 2");
+ }
+
+ private Iterable<Record> read(UnifiedLog log, long offset) throws
IOException {
+ return log.read(offset, 4096, FetchIsolation.LOG_END,
true).records.records();
+ }
+
+ /**
+ * Test garbage collecting old segments
+ */
+ @Test
+ public void testThatGarbageCollectingSegmentsDoesntChangeOffset() throws
IOException {
+ for (int messagesToAppend : List.of(0, 1, 25)) {
+ logDir.mkdirs();
+ // first test a log segment starting at 0
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(100)
+ .retentionMs(0)
+ .build();
+ var testLog = createLog(logDir, logConfig);
+ for (int i = 0; i < messagesToAppend; i++) {
+
testLog.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord(mockTime.milliseconds() - 10, null,
String.valueOf(i).getBytes())), 0);
+ }
+
+ var currOffset = testLog.logEndOffset();
+ assertEquals(currOffset, messagesToAppend);
+
+ // time goes by; the log file is deleted
+ testLog.updateHighWatermark(currOffset);
+ testLog.deleteOldSegments();
+
+ assertEquals(currOffset, testLog.logEndOffset(), "Deleting
segments shouldn't have changed the logEndOffset");
+ assertEquals(1, testLog.numberOfSegments(), "We should still have
one segment left");
+ assertEquals(0, testLog.deleteOldSegments(), "Further collection
shouldn't delete anything");
+ assertEquals(currOffset, testLog.logEndOffset(), "Still no change
in the logEndOffset");
+ assertEquals(currOffset,
+
testLog.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord(mockTime.milliseconds(), null,
"hello".getBytes())), 0).firstOffset(),
+ "Should still be able to append and should get the
logEndOffset assigned to the new append");
+
+ // cleanup the log
+ logsToClose.remove(testLog);
+ testLog.delete();
+ }
+ }
+
+ /**
+ * MessageSet size shouldn't exceed the config.segmentSize, check that it
is properly enforced by
+ * appending a message set larger than the config.segmentSize setting and
checking that an exception is thrown.
+ */
+ @Test
+ public void testMessageSetSizeCheck() throws IOException {
+ var messageSet = MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord("You".getBytes()), new
SimpleRecord("bethe".getBytes()));
+ // append messages to log
+ var configSegmentSize = messageSet.sizeInBytes() - 1;
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(configSegmentSize)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ assertThrows(RecordBatchTooLargeException.class, () ->
log.appendAsLeader(messageSet, 0));
+ }
+
+ @Test
+ public void testCompactedTopicConstraints() throws IOException {
+ var keyedMessage = new SimpleRecord("and here it is".getBytes(), "this
message has a key".getBytes());
+ var anotherKeyedMessage = new SimpleRecord("another key".getBytes(),
"this message also has a key".getBytes());
+ var unkeyedMessage = new SimpleRecord("this message does not have a
key".getBytes());
+
+ var messageSetWithUnkeyedMessage =
MemoryRecords.withRecords(Compression.NONE, unkeyedMessage, keyedMessage);
+ var messageSetWithOneUnkeyedMessage =
MemoryRecords.withRecords(Compression.NONE, unkeyedMessage);
+ var messageSetWithCompressedKeyedMessage =
MemoryRecords.withRecords(Compression.gzip().build(), keyedMessage);
+ var messageSetWithCompressedUnkeyedMessage =
MemoryRecords.withRecords(Compression.gzip().build(), keyedMessage,
unkeyedMessage);
+ var messageSetWithKeyedMessage =
MemoryRecords.withRecords(Compression.NONE, keyedMessage);
+ var messageSetWithKeyedMessages =
MemoryRecords.withRecords(Compression.NONE, keyedMessage, anotherKeyedMessage);
+
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .cleanupPolicy(TopicConfig.CLEANUP_POLICY_COMPACT)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ var errorMsgPrefix = "Compacted topic cannot accept message without
key";
+
+ var e = assertThrows(RecordValidationException.class,
+ () -> log.appendAsLeader(messageSetWithUnkeyedMessage, 0));
+ assertInstanceOf(InvalidRecordException.class, e.invalidException());
+ assertEquals(1, e.recordErrors().size());
+ assertEquals(0, e.recordErrors().get(0).batchIndex);
+ assertTrue(e.recordErrors().get(0).message.startsWith(errorMsgPrefix));
+
+ e = assertThrows(RecordValidationException.class,
+ () -> log.appendAsLeader(messageSetWithOneUnkeyedMessage, 0));
+ assertInstanceOf(InvalidRecordException.class, e.invalidException());
+ assertEquals(1, e.recordErrors().size());
+ assertEquals(0, e.recordErrors().get(0).batchIndex);
+ assertTrue(e.recordErrors().get(0).message.startsWith(errorMsgPrefix));
+
+ e = assertThrows(RecordValidationException.class,
+ () ->
log.appendAsLeader(messageSetWithCompressedUnkeyedMessage, 0));
+ assertInstanceOf(InvalidRecordException.class, e.invalidException());
+ assertEquals(1, e.recordErrors().size());
+ assertEquals(1, e.recordErrors().get(0).batchIndex); // batch index
is 1
+ assertTrue(e.recordErrors().get(0).message.startsWith(errorMsgPrefix));
+
+ // check if metric for NoKeyCompactedTopicRecordsPerSec is logged
+ assertEquals(1,
KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().stream()
+ .filter(k ->
k.getMBeanName().endsWith(BrokerTopicMetrics.NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC))
+ .count());
+
assertTrue(meterCount(BrokerTopicMetrics.NO_KEY_COMPACTED_TOPIC_RECORDS_PER_SEC)
> 0);
+
+ // the following should succeed without any InvalidMessageException
+ log.appendAsLeader(messageSetWithKeyedMessage, 0);
+ log.appendAsLeader(messageSetWithKeyedMessages, 0);
+ log.appendAsLeader(messageSetWithCompressedKeyedMessage, 0);
+ }
+
+ /**
+ * We have a max size limit on message appends, check that it is properly
enforced by appending a message larger than the
+ * setting and checking that an exception is thrown.
+ */
+ @Test
+ public void testMessageSizeCheck() throws IOException {
+ var first = MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord("You".getBytes()), new
SimpleRecord("bethe".getBytes()));
+ var second = MemoryRecords.withRecords(Compression.NONE,
+ new SimpleRecord("change (I need more bytes)... blah blah
blah.".getBytes()),
+ new SimpleRecord("More padding boo hoo".getBytes()));
+
+ // append messages to log
+ var maxMessageSize = second.sizeInBytes() - 1;
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .maxMessageBytes(maxMessageSize)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ // should be able to append the small message
+ log.appendAsLeader(first, 0);
+
+ assertThrows(
+ RecordTooLargeException.class,
+ () -> log.appendAsLeader(second, 0),
+ "Second message set should throw MessageSizeTooLargeException."
+ );
+ }
+
+ @Test
+ public void testMessageSizeCheckInAppendAsFollower() throws IOException {
+ var first = MemoryRecords.withRecords(0, Compression.NONE, 0,
+ new SimpleRecord("You".getBytes()), new
SimpleRecord("bethe".getBytes()));
+ var second = MemoryRecords.withRecords(5, Compression.NONE, 0,
+ new SimpleRecord("change (I need more bytes)... blah blah
blah.".getBytes()),
+ new SimpleRecord("More padding boo hoo".getBytes()));
+
+ log = createLog(logDir, new LogTestUtils.LogConfigBuilder()
+ .maxMessageBytes(second.sizeInBytes() - 1)
+ .build());
+
+ log.appendAsFollower(first, Integer.MAX_VALUE);
+ // the second record is larger than limit but appendAsFollower does
not validate the size.
+ log.appendAsFollower(second, Integer.MAX_VALUE);
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(InvalidMemoryRecordsProvider.class)
+ public void testInvalidMemoryRecords(MemoryRecords records,
Optional<Class<Exception>> expectedException) throws IOException {
+ log = createLog(logDir, new LogConfig(new Properties()));
+ var previousEndOffset = log.logEndOffsetMetadata().messageOffset;
+
+ if (expectedException.isPresent()) {
+ assertThrows(expectedException.get(), () ->
log.appendAsFollower(records, Integer.MAX_VALUE));
+ } else {
+ log.appendAsFollower(records, Integer.MAX_VALUE);
+ }
+
+ assertEquals(previousEndOffset,
log.logEndOffsetMetadata().messageOffset);
+ }
+
+ @Test
+ public void testRandomRecords() throws IOException {
+ var random = new java.util.Random();
+ for (int i = 0; i < 100; i++) {
+ var size = random.nextInt(128) + 1;
+ var bytes = new byte[size];
+ random.nextBytes(bytes);
+ var records =
MemoryRecords.readableRecords(ByteBuffer.wrap(bytes));
+
+ var tempDir = TestUtils.tempDirectory();
+ var randomLogDir = TestUtils.randomPartitionLogDir(tempDir);
+ var testLog = createLog(randomLogDir, new LogConfig(new
Properties()));
+ try {
+ var previousEndOffset =
testLog.logEndOffsetMetadata().messageOffset;
+
+ // Depending on the corruption, unified log sometimes throws
and sometimes returns an
+ // empty set of batches
+ assertThrows(CorruptRecordException.class, () -> {
+ var info = testLog.appendAsFollower(records,
Integer.MAX_VALUE);
+ if (info.firstOffset() == UnifiedLog.UNKNOWN_OFFSET) {
+ throw new CorruptRecordException("Unknown offset is
test");
+ }
+ });
+
+ assertEquals(previousEndOffset,
testLog.logEndOffsetMetadata().messageOffset);
+ } finally {
+ logsToClose.remove(testLog);
+ testLog.close();
+ Utils.delete(tempDir);
+ }
+ }
+ }
+
+ @Test
+ public void testInvalidLeaderEpoch() throws IOException {
+ log = createLog(logDir, new LogConfig(new Properties()));
+ var previousEndOffset = log.logEndOffsetMetadata().messageOffset;
+ var epoch = log.latestEpoch().orElse(0) + 1;
+ var numberOfRecords = 10;
+
+ var recordsForBatch = IntStream.range(0, numberOfRecords)
+ .mapToObj(n -> new SimpleRecord(String.valueOf(n).getBytes()))
+ .toArray(SimpleRecord[]::new);
+
+ var batchWithValidEpoch = MemoryRecords.withRecords(
+ previousEndOffset, Compression.NONE, epoch, recordsForBatch);
+
+ var batchWithInvalidEpoch = MemoryRecords.withRecords(
+ previousEndOffset + numberOfRecords, Compression.NONE, epoch +
1, recordsForBatch);
+
+ var buffer = ByteBuffer.allocate(batchWithValidEpoch.sizeInBytes() +
batchWithInvalidEpoch.sizeInBytes());
+ buffer.put(batchWithValidEpoch.buffer());
+ buffer.put(batchWithInvalidEpoch.buffer());
+ buffer.flip();
+
+ var combinedRecords = MemoryRecords.readableRecords(buffer);
+ log.appendAsFollower(combinedRecords, epoch);
+
+ // Check that only the first batch was appended
+ assertEquals(previousEndOffset + numberOfRecords,
log.logEndOffsetMetadata().messageOffset);
+ // Check that the last fetched epoch matches the first batch
+ assertEquals(epoch, (int) log.latestEpoch().get());
+ }
+
+ @Test
+ public void testLogFlushesPartitionMetadataOnAppend() throws IOException {
+ log = createLog(logDir, new LogConfig(new Properties()));
+ var record = MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord("simpleValue".getBytes()));
+
+ var topicId = Uuid.randomUuid();
+ log.partitionMetadataFile().get().record(topicId);
+
+ // Should trigger a synchronous flush
+ log.appendAsLeader(record, 0);
+ assertTrue(log.partitionMetadataFile().get().exists());
+ assertEquals(topicId,
log.partitionMetadataFile().get().read().topicId());
+ }
+
+ @Test
+ public void testLogFlushesPartitionMetadataOnClose() throws IOException {
+ var logConfig = new LogConfig(new Properties());
+ var firstLog = createLog(logDir, logConfig);
+ var topicId = Uuid.randomUuid();
+ firstLog.partitionMetadataFile().get().record(topicId);
+
+ // Should trigger a synchronous flush
+ firstLog.close();
+
+ // We open the log again, and the partition metadata file should exist
with the same ID.
+ log = createLog(logDir, logConfig);
+ assertTrue(log.partitionMetadataFile().get().exists());
+ assertEquals(topicId,
log.partitionMetadataFile().get().read().topicId());
+ }
+
+ @Test
+ public void testLogRecoversTopicId() throws IOException {
+ var logConfig = new LogConfig(new Properties());
+ var firstLog = createLog(logDir, logConfig);
+ var topicId = Uuid.randomUuid();
+ firstLog.assignTopicId(topicId);
+ firstLog.close();
+
+ // test recovery case
+ log = createLog(logDir, logConfig);
+ assertTrue(log.topicId().isPresent());
+ assertEquals(topicId, log.topicId().get());
+ }
+
+ @Test
+ public void testLogFailsWhenInconsistentTopicIdSet() throws IOException {
+ var logConfig = new LogConfig(new Properties());
+ var firstLog = createLog(logDir, logConfig);
+ var topicId = Uuid.randomUuid();
+ firstLog.assignTopicId(topicId);
+ firstLog.close();
+
+ // test creating a log with a new ID
+ assertThrows(InconsistentTopicIdException.class, () ->
+ createLog(logDir, logConfig, 0L, 0L, brokerTopicStats,
mockTime.scheduler, mockTime,
+ producerStateManagerConfig, false,
Optional.of(Uuid.randomUuid()), false));
+ }
+
+ /**
+ * Test building the time index on the follower by setting assignOffsets
to false.
+ */
+ @Test
+ public void testBuildTimeIndexWhenNotAssigningOffsets() throws IOException
{
+ var numMessages = 100;
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(10000)
+ .indexIntervalBytes(1)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ for (int i = 0; i < numMessages; i++) {
+ log.appendAsFollower(
+ MemoryRecords.withRecords(100 + i, Compression.NONE, 0,
+ new SimpleRecord(mockTime.milliseconds() + i,
String.valueOf(i).getBytes())),
+ Integer.MAX_VALUE);
+ }
+
+ var timeIndexEntries = log.logSegments().stream()
+ .mapToInt(segment -> {
+ try {
+ return segment.timeIndex().entries();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).sum();
+ assertEquals(numMessages - 1, timeIndexEntries,
+ "There should be " + (numMessages - 1) + " time index
entries");
+ assertEquals(mockTime.milliseconds() + numMessages - 1,
+ log.activeSegment().timeIndex().lastEntry().timestamp(),
+ "The last time index entry should have timestamp " +
(mockTime.milliseconds() + numMessages - 1));
+ }
+
+ @Test
+ public void testFetchOffsetByTimestampIncludesLeaderEpoch() throws
IOException {
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(200)
+ .indexIntervalBytes(1)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ assertEquals(new OffsetResultHolder(Optional.empty()),
+ log.fetchOffsetByTimestamp(0L, Optional.empty()));
+
+ var firstTimestamp = mockTime.milliseconds();
+ var firstLeaderEpoch = 0;
+ log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10),
firstTimestamp), firstLeaderEpoch);
+
+ var secondTimestamp = firstTimestamp + 1;
+ var secondLeaderEpoch = 1;
+ log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10),
secondTimestamp), secondLeaderEpoch);
+
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(firstTimestamp, 0L,
Optional.of(firstLeaderEpoch))),
+ log.fetchOffsetByTimestamp(firstTimestamp, Optional.empty()));
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(secondTimestamp, 1L,
Optional.of(secondLeaderEpoch))),
+ log.fetchOffsetByTimestamp(secondTimestamp, Optional.empty()));
+
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP,
Optional.empty()));
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP,
Optional.empty()));
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L,
Optional.of(secondLeaderEpoch))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
Optional.empty()));
+
+ // The cache can be updated directly after a leader change.
+ // The new latest offset should reflect the updated epoch.
+ log.assignEpochStartOffset(2, 2L);
+
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L,
Optional.of(2))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
Optional.empty()));
+ }
+
+ @Test
+ public void testFetchOffsetByTimestampWithMaxTimestampIncludesTimestamp()
throws IOException {
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(200)
+ .indexIntervalBytes(1)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ assertEquals(new OffsetResultHolder(Optional.empty()),
+ log.fetchOffsetByTimestamp(0L, Optional.empty()));
+
+ var firstTimestamp = mockTime.milliseconds();
+ var leaderEpoch = 0;
+ log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10),
firstTimestamp), leaderEpoch);
+
+ var secondTimestamp = firstTimestamp + 1;
+ log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10),
secondTimestamp), leaderEpoch);
+ log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10),
firstTimestamp), leaderEpoch);
+
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(secondTimestamp, 1L, Optional.of(leaderEpoch))),
+ log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP,
Optional.empty()));
+ }
+
+ @Test
+ public void testFetchOffsetByTimestampFromRemoteStorage() throws Exception
{
+ var config = createKafkaConfigWithRLM();
+ var purgatory = new
DelayedOperationPurgatory<DelayedRemoteListOffsets>("RemoteListOffsets",
config.brokerId());
+ var remoteLogManager = spy(new
RemoteLogManager(config.remoteLogManagerConfig(),
+ 0,
+ logDir.getAbsolutePath(),
+ "clusterId",
+ mockTime,
+ tp -> Optional.empty(),
+ (tp, offset) -> { },
+ brokerTopicStats,
+ new Metrics(),
+ Optional.empty()));
+ remoteLogManager.setDelayedOperationPurgatory(purgatory);
+
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(200)
+ .indexIntervalBytes(1)
+ .remoteLogStorageEnable(true)
+ .build();
+ log = createLog(logDir, logConfig, true);
+
+ // Note that the log is empty, so remote offset read won't happen
+ assertEquals(new OffsetResultHolder(Optional.empty()),
+ log.fetchOffsetByTimestamp(0L, Optional.of(remoteLogManager)));
+
+ var firstTimestamp = mockTime.milliseconds();
+ var firstLeaderEpoch = 0;
+ log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10),
firstTimestamp), firstLeaderEpoch);
+
+ var secondTimestamp = firstTimestamp + 1;
+ var secondLeaderEpoch = 1;
+ log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10),
secondTimestamp), secondLeaderEpoch);
+
+ doAnswer(ans -> {
+ long timestamp = ans.getArgument(1);
+ return Optional.of(timestamp)
+ .filter(t -> t == firstTimestamp)
+ .map(t -> new FileRecords.TimestampAndOffset(t, 0L,
Optional.of(firstLeaderEpoch)));
+ }).when(remoteLogManager).findOffsetByTimestamp(
+ eq(log.topicPartition()), anyLong(), anyLong(),
eq(log.leaderEpochCache()));
+ log.updateLocalLogStartOffset(1);
+
+ // In the assertions below we test that offset 0 (first timestamp) is
in remote and offset 1 (second timestamp) is in local storage.
+ assertFetchOffsetByTimestamp(remoteLogManager, new
FileRecords.TimestampAndOffset(firstTimestamp, 0L,
Optional.of(firstLeaderEpoch)), firstTimestamp, log);
+ assertFetchOffsetByTimestamp(remoteLogManager, new
FileRecords.TimestampAndOffset(secondTimestamp, 1L,
Optional.of(secondLeaderEpoch)), secondTimestamp, log);
+
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP,
Optional.of(remoteLogManager)));
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L,
Optional.of(secondLeaderEpoch))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP,
Optional.of(remoteLogManager)));
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L,
Optional.of(secondLeaderEpoch))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
Optional.of(remoteLogManager)));
+
+ log.assignEpochStartOffset(2, 2L);
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L,
Optional.of(2))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
Optional.of(remoteLogManager)));
+ }
+
+ @Test
+ public void testFetchLatestTieredTimestampNoRemoteStorage() throws
IOException {
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(200)
+ .indexIntervalBytes(1)
+ .build();
+ log = createLog(logDir, logConfig);
+
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1,
Optional.of(-1))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP,
Optional.empty()));
+
+ var firstTimestamp = mockTime.milliseconds();
+ var leaderEpoch = 0;
+ log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10),
firstTimestamp), leaderEpoch);
+ log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10),
firstTimestamp + 1), leaderEpoch);
+
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, -1,
Optional.of(-1))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP,
Optional.empty()));
+ }
+
+ @Test
+ public void testFetchLatestTieredTimestampWithRemoteStorage() throws
Exception {
+ var config = createKafkaConfigWithRLM();
+ var purgatory = new
DelayedOperationPurgatory<DelayedRemoteListOffsets>("RemoteListOffsets",
config.brokerId());
+ var remoteLogManager = spy(new
RemoteLogManager(config.remoteLogManagerConfig(),
+ 0,
+ logDir.getAbsolutePath(),
+ "clusterId",
+ mockTime,
+ tp -> Optional.empty(),
+ (tp, offset) -> { },
+ brokerTopicStats,
+ new Metrics(),
+ Optional.empty()));
+ remoteLogManager.setDelayedOperationPurgatory(purgatory);
+
+ var logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(200)
+ .indexIntervalBytes(1)
+ .remoteLogStorageEnable(true)
+ .build();
+ log = createLog(logDir, logConfig, true);
+
+ // Note that the log is empty, so remote offset read won't happen
+ assertEquals(new OffsetResultHolder(Optional.empty()),
+ log.fetchOffsetByTimestamp(0L, Optional.of(remoteLogManager)));
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0,
Optional.empty())),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP,
Optional.of(remoteLogManager)));
+
+ var firstTimestamp = mockTime.milliseconds();
+ var firstLeaderEpoch = 0;
+ log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10),
firstTimestamp), firstLeaderEpoch);
+
+ var secondTimestamp = firstTimestamp + 1;
+ var secondLeaderEpoch = 1;
+ log.appendAsLeader(singletonRecords(TestUtils.randomBytes(10),
secondTimestamp), secondLeaderEpoch);
+
+ doAnswer(ans -> {
+ long timestamp = ans.getArgument(1);
+ return Optional.of(timestamp)
+ .filter(t -> t == firstTimestamp)
+ .map(t -> new FileRecords.TimestampAndOffset(t, 0L,
Optional.of(firstLeaderEpoch)));
+ }).when(remoteLogManager).findOffsetByTimestamp(
+ eq(log.topicPartition()), anyLong(), anyLong(),
eq(log.leaderEpochCache()));
+ log.updateLocalLogStartOffset(1);
+ log.updateHighestOffsetInRemoteStorage(0);
+
+ // In the assertions below we test that offset 0 (first timestamp) is
in remote and offset 1 (second timestamp) is in local storage.
+ assertFetchOffsetByTimestamp(remoteLogManager, new
FileRecords.TimestampAndOffset(firstTimestamp, 0L,
Optional.of(firstLeaderEpoch)), firstTimestamp, log);
+ assertFetchOffsetByTimestamp(remoteLogManager, new
FileRecords.TimestampAndOffset(secondTimestamp, 1L,
Optional.of(secondLeaderEpoch)), secondTimestamp, log);
+
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP,
Optional.of(remoteLogManager)));
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L,
Optional.of(firstLeaderEpoch))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP,
Optional.of(remoteLogManager)));
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L,
Optional.of(secondLeaderEpoch))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP,
Optional.of(remoteLogManager)));
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L,
Optional.of(secondLeaderEpoch))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
Optional.of(remoteLogManager)));
+
+ log.assignEpochStartOffset(2, 2L);
+ assertEquals(new OffsetResultHolder(new
FileRecords.TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L,
Optional.of(2))),
+
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
Optional.of(remoteLogManager)));
+ }
+
+ private void assertFetchOffsetByTimestamp(RemoteLogManager
remoteLogManager,
+ FileRecords.TimestampAndOffset
expected,
+ long timestamp,
+ UnifiedLog testLog) throws
Exception {
+ OffsetResultHolder offsetResultHolder =
testLog.fetchOffsetByTimestamp(timestamp, Optional.of(remoteLogManager));
+ assertTrue(offsetResultHolder.futureHolderOpt().isPresent());
+ offsetResultHolder.futureHolderOpt().get().taskFuture().get(1,
TimeUnit.SECONDS);
+
assertTrue(offsetResultHolder.futureHolderOpt().get().taskFuture().isDone());
+
assertTrue(offsetResultHolder.futureHolderOpt().get().taskFuture().get().hasTimestampAndOffset());
+ assertEquals(expected,
offsetResultHolder.futureHolderOpt().get().taskFuture().get().timestampAndOffset().orElse(null));
+ }
+
+ private KafkaConfig createKafkaConfigWithRLM() {
Review Comment:
Could we get rid this method? It seems all the tests only require a broker
id and an instance of `RemoteLogManagerConfig`
My thinking is that this would remove the dependency on `KafkaConfig`
##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -41,20 +41,16 @@ import org.apache.kafka.server.storage.log.{FetchIsolation,
UnexpectedAppendOffs
import org.apache.kafka.server.util.{MockTime, Scheduler}
import
org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile,
PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
AsyncOffsetReader, Cleaner, LogConfig, LogFileUtils, LogOffsetMetadata,
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments,
LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder,
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig,
RecordValidationException, UnifiedLog, VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin,
AsyncOffsetReader, Cleaner, LogConfig, LogFileUtils, LogOffsetMetadata,
LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments,
LogStartOffsetIncrementReason, LogToClean, OffsetResultHolder,
OffsetsOutOfOrderException, ProducerStateManager, ProducerStateManagerConfig,
UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.utils.Throttler
-import org.apache.kafka.storage.log.metrics.{BrokerTopicMetrics,
BrokerTopicStats}
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, _}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ArgumentsSource
import org.junit.jupiter.params.provider.{EnumSource, ValueSource}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyLong}
import org.mockito.Mockito.{doAnswer, doThrow, spy}
-import net.jqwik.api.AfterFailureMode
-import net.jqwik.api.ForAll
-import net.jqwik.api.Property
Review Comment:
I don't think we use `net.jqwik` anywhere else in `core`. Can we remove
`testImplementation libs.jqwik` from `build.gradle`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]