frankvicky commented on code in PR #17174: URL: https://github.com/apache/kafka/pull/17174#discussion_r1759066912
########## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ########## @@ -1,666 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.log - -import kafka.utils.TestUtils -import kafka.utils.TestUtils.random -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.compress.Compression -import org.apache.kafka.common.config.TopicConfig -import org.apache.kafka.common.record._ -import org.apache.kafka.common.utils.{MockTime, Time, Utils} -import org.apache.kafka.coordinator.transaction.TransactionLogConfig -import org.apache.kafka.server.util.MockScheduler -import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile -import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log._ -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{CsvSource, ValueSource} - -import java.io.{File, RandomAccessFile} -import java.util.{Optional, OptionalLong} -import scala.collection._ -import scala.jdk.CollectionConverters._ - -class LogSegmentTest { - private val topicPartition = new TopicPartition("topic", 0) - private val segments = mutable.ArrayBuffer[LogSegment]() - private var logDir: File = _ - - /* create a segment with the given base offset */ - def createSegment(offset: Long, - indexIntervalBytes: Int = 10, - time: Time = Time.SYSTEM): LogSegment = { - val seg = LogTestUtils.createSegment(offset, logDir, indexIntervalBytes, time) - segments += seg - seg - } - - /* create a ByteBufferMessageSet for the given messages starting from the given offset */ - def records(offset: Long, records: String*): MemoryRecords = { - MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, offset, Compression.NONE, TimestampType.CREATE_TIME, - records.map { s => new SimpleRecord(offset * 10, s.getBytes) }: _*) - } - - @BeforeEach - def setup(): Unit = { - logDir = TestUtils.tempDir() - } - - @AfterEach - def teardown(): Unit = { - segments.foreach(_.close()) - Utils.delete(logDir) - } - - /** - * LogSegmentOffsetOverflowException should be thrown while appending the logs if: - * 1. largestOffset - baseOffset < 0 - * 2. largestOffset - baseOffset > Integer.MAX_VALUE - */ - @ParameterizedTest - @CsvSource(Array( - "0, -2147483648", - "0, 2147483648", - "1, 0", - "100, 10", - "2147483648, 0", - "-2147483648, 0", - "2147483648,4294967296" - )) - def testAppendForLogSegmentOffsetOverflowException(baseOffset: Long, largestOffset: Long): Unit = { - val seg = createSegment(baseOffset) - val currentTime = Time.SYSTEM.milliseconds() - val shallowOffsetOfMaxTimestamp = largestOffset - val memoryRecords = records(0, "hello") - assertThrows(classOf[LogSegmentOffsetOverflowException], () => { - seg.append(largestOffset, currentTime, shallowOffsetOfMaxTimestamp, memoryRecords) - }) - } - - /** - * A read on an empty log segment should return null - */ - @Test - def testReadOnEmptySegment(): Unit = { - val seg = createSegment(40) - val read = seg.read(40, 300) - assertNull(read, "Read beyond the last offset in the segment should be null") - } - - /** - * Reading from before the first offset in the segment should return messages - * beginning with the first message in the segment - */ - @Test - def testReadBeforeFirstOffset(): Unit = { - val seg = createSegment(40) - val ms = records(50, "hello", "there", "little", "bee") - seg.append(53, RecordBatch.NO_TIMESTAMP, -1L, ms) - val read = seg.read(41, 300).records - checkEquals(ms.records.iterator, read.records.iterator) - } - - /** - * If we read from an offset beyond the last offset in the segment we should get null - */ - @Test - def testReadAfterLast(): Unit = { - val seg = createSegment(40) - val ms = records(50, "hello", "there") - seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) - val read = seg.read(52, 200) - assertNull(read, "Read beyond the last offset in the segment should give null") - } - - /** - * If we read from an offset which doesn't exist we should get a message set beginning - * with the least offset greater than the given startOffset. - */ - @Test - def testReadFromGap(): Unit = { - val seg = createSegment(40) - val ms = records(50, "hello", "there") - seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) - val ms2 = records(60, "alpha", "beta") - seg.append(61, RecordBatch.NO_TIMESTAMP, -1L, ms2) - val read = seg.read(55, 200) - checkEquals(ms2.records.iterator, read.records.records.iterator) - } - - @ParameterizedTest(name = "testReadWhenNoMaxPosition minOneMessage = {0}") - @ValueSource(booleans = Array(true, false)) - def testReadWhenNoMaxPosition(minOneMessage: Boolean): Unit = { - val maxPosition: Optional[java.lang.Long] = Optional.empty() - val maxSize = 1 - val seg = createSegment(40) - val ms = records(50, "hello", "there") - seg.append(51, RecordBatch.NO_TIMESTAMP, -1L, ms) - // read before first offset - var read = seg.read(48, maxSize, maxPosition, minOneMessage) - assertEquals(new LogOffsetMetadata(48, 40, 0), read.fetchOffsetMetadata) - assertTrue(read.records.records().iterator().asScala.isEmpty) - // read at first offset - read = seg.read(50, maxSize, maxPosition, minOneMessage) - assertEquals(new LogOffsetMetadata(50, 40, 0), read.fetchOffsetMetadata) - assertTrue(read.records.records().iterator().asScala.isEmpty) - // read at last offset - read = seg.read(51, maxSize, maxPosition, minOneMessage) - assertEquals(new LogOffsetMetadata(51, 40, 39), read.fetchOffsetMetadata) - assertTrue(read.records.records().iterator().asScala.isEmpty) - // read at log-end-offset - read = seg.read(52, maxSize, maxPosition, minOneMessage) - assertNull(read) - // read beyond log-end-offset - read = seg.read(53, maxSize, maxPosition, minOneMessage) - assertNull(read) - } - - /** - * In a loop append two messages then truncate off the second of those messages and check that we can read - * the first but not the second message. - */ - @Test - def testTruncate(): Unit = { - val seg = createSegment(40) - var offset = 40 - for (_ <- 0 until 30) { - val ms1 = records(offset, "hello") - seg.append(offset, RecordBatch.NO_TIMESTAMP, -1L, ms1) - val ms2 = records(offset + 1, "hello") - seg.append(offset + 1, RecordBatch.NO_TIMESTAMP, -1L, ms2) - // check that we can read back both messages - val read = seg.read(offset, 10000) - assertEquals(List(ms1.records.iterator.next(), ms2.records.iterator.next()), read.records.records.asScala.toList) - // now truncate off the last message - seg.truncateTo(offset + 1) - val read2 = seg.read(offset, 10000) - assertEquals(1, read2.records.records.asScala.size) - checkEquals(ms1.records.iterator, read2.records.records.iterator) - offset += 1 - } - } - - @Test - def testTruncateEmptySegment(): Unit = { - // This tests the scenario in which the follower truncates to an empty segment. In this - // case we must ensure that the index is resized so that the log segment is not mistakenly - // rolled due to a full index - - val maxSegmentMs = 300000 - val time = new MockTime - val seg = createSegment(0, time = time) - // Force load indexes before closing the segment - seg.timeIndex - seg.offsetIndex - seg.close() - - val reopened = createSegment(0, time = time) - assertEquals(0, seg.timeIndex.sizeInBytes) - assertEquals(0, seg.offsetIndex.sizeInBytes) - - time.sleep(500) - reopened.truncateTo(57) - assertEquals(0, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)) - assertFalse(reopened.timeIndex.isFull) - assertFalse(reopened.offsetIndex.isFull) - - var rollParams = new RollParams(maxSegmentMs, Int.MaxValue, RecordBatch.NO_TIMESTAMP, 100L, 1024, - time.milliseconds()) - assertFalse(reopened.shouldRoll(rollParams)) - - // The segment should not be rolled even if maxSegmentMs has been exceeded - time.sleep(maxSegmentMs + 1) - assertEquals(maxSegmentMs + 1, reopened.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)) - rollParams = new RollParams(maxSegmentMs, Int.MaxValue, RecordBatch.NO_TIMESTAMP, 100L, 1024, time.milliseconds()) - assertFalse(reopened.shouldRoll(rollParams)) - - // But we should still roll the segment if we cannot fit the next offset - rollParams = new RollParams(maxSegmentMs, Int.MaxValue, RecordBatch.NO_TIMESTAMP, - Int.MaxValue.toLong + 200L, 1024, time.milliseconds()) - assertTrue(reopened.shouldRoll(rollParams)) - } - - @Test - def testReloadLargestTimestampAndNextOffsetAfterTruncation(): Unit = { - val numMessages = 30 - val seg = createSegment(40, 2 * records(0, "hello").sizeInBytes - 1) - var offset = 40 - for (_ <- 0 until numMessages) { - seg.append(offset, offset, offset, records(offset, "hello")) - offset += 1 - } - assertEquals(offset, seg.readNextOffset) - - val expectedNumEntries = numMessages / 2 - 1 - assertEquals(expectedNumEntries, seg.timeIndex.entries, s"Should have $expectedNumEntries time indexes") - - seg.truncateTo(41) - assertEquals(0, seg.timeIndex.entries, s"Should have 0 time indexes") - assertEquals(400L, seg.largestTimestamp, s"Largest timestamp should be 400") - assertEquals(41, seg.readNextOffset) - } - - /** - * Test truncating the whole segment, and check that we can reappend with the original offset. - */ - @Test - def testTruncateFull(): Unit = { - // test the case where we fully truncate the log - val time = new MockTime - val seg = createSegment(40, time = time) - seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")) - - // If the segment is empty after truncation, the create time should be reset - time.sleep(500) - assertEquals(500, seg.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)) - - seg.truncateTo(0) - assertEquals(0, seg.timeWaitedForRoll(time.milliseconds(), RecordBatch.NO_TIMESTAMP)) - assertFalse(seg.timeIndex.isFull) - assertFalse(seg.offsetIndex.isFull) - assertNull(seg.read(0, 1024), "Segment should be empty.") - - seg.append(41, RecordBatch.NO_TIMESTAMP, -1L, records(40, "hello", "there")) - } - - /** - * Append messages with timestamp and search message by timestamp. - */ - @Test - def testFindOffsetByTimestamp(): Unit = { - val messageSize = records(0, s"msg00").sizeInBytes - val seg = createSegment(40, messageSize * 2 - 1) - // Produce some messages - for (i <- 40 until 50) - seg.append(i, i * 10, i, records(i, s"msg$i")) - - assertEquals(490, seg.largestTimestamp) - // Search for an indexed timestamp - assertEquals(42, seg.findOffsetByTimestamp(420, 0L).get.offset) - assertEquals(43, seg.findOffsetByTimestamp(421, 0L).get.offset) - // Search for an un-indexed timestamp - assertEquals(43, seg.findOffsetByTimestamp(430, 0L).get.offset) - assertEquals(44, seg.findOffsetByTimestamp(431, 0L).get.offset) - // Search beyond the last timestamp - assertEquals(Optional.empty(), seg.findOffsetByTimestamp(491, 0L)) - // Search before the first indexed timestamp - assertEquals(41, seg.findOffsetByTimestamp(401, 0L).get.offset) - // Search before the first timestamp - assertEquals(40, seg.findOffsetByTimestamp(399, 0L).get.offset) - } - - /** - * Test that offsets are assigned sequentially and that the nextOffset variable is incremented - */ - @Test - def testNextOffsetCalculation(): Unit = { - val seg = createSegment(40) - assertEquals(40, seg.readNextOffset) - seg.append(52, RecordBatch.NO_TIMESTAMP, -1L, records(50, "hello", "there", "you")) - assertEquals(53, seg.readNextOffset) - } - - /** - * Test that we can change the file suffixes for the log and index files - */ - @Test - def testChangeFileSuffixes(): Unit = { - val seg = createSegment(40) - val logFile = seg.log.file - val indexFile = seg.offsetIndexFile - val timeIndexFile = seg.timeIndexFile - // Ensure that files for offset and time indices have not been created eagerly. - assertFalse(seg.offsetIndexFile.exists) - assertFalse(seg.timeIndexFile.exists) - seg.changeFileSuffixes("", ".deleted") Review Comment: `.deleted` could be extracted as a variable since it is used everywhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
