junrao commented on code in PR #19214:
URL: https://github.com/apache/kafka/pull/19214#discussion_r1999287436


##########
clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java:
##########
@@ -518,6 +523,176 @@ public void testBytesLengthOfWriteTo() throws IOException 
{
         verify(channel).transferFrom(any(), anyLong(), eq((long) size - 
firstWritten));
     }
 
+    /**
+     * Test two condition
+     * 1. If the target offset equals to the base offset of the first batch
+     * 2. If the target offset < the base offset of the first batch
+     */
+    @ParameterizedTest
+    @ValueSource(longs = {5, 10})
+    public void 
testSearchForOffsetWithSizeLastOffsetCallCountFirstBatchMatch(long baseOffset) 
throws IOException {

Review Comment:
   Quite a long name. The `LastOffsetCallCount` doesn't seem to convey much.



##########
clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java:
##########
@@ -518,6 +523,176 @@ public void testBytesLengthOfWriteTo() throws IOException 
{
         verify(channel).transferFrom(any(), anyLong(), eq((long) size - 
firstWritten));
     }
 
+    /**
+     * Test two condition

Review Comment:
   two condition => two conditions



##########
clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java:
##########
@@ -518,6 +523,176 @@ public void testBytesLengthOfWriteTo() throws IOException 
{
         verify(channel).transferFrom(any(), anyLong(), eq((long) size - 
firstWritten));
     }
 
+    /**
+     * Test two condition
+     * 1. If the target offset equals to the base offset of the first batch
+     * 2. If the target offset < the base offset of the first batch
+     */
+    @ParameterizedTest
+    @ValueSource(longs = {5, 10})
+    public void 
testSearchForOffsetWithSizeLastOffsetCallCountFirstBatchMatch(long baseOffset) 
throws IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch batch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch.baseOffset()).thenReturn(baseOffset);
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, batch);
+
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetWithSize(5L, 0);
+
+        assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result);
+        verify(batch, never()).lastOffset();
+    }
+
+    @Test
+    public void 
testSearchForOffsetWithSizeLastOffsetCallCountFirstBatchLastOffsetMatch() 
throws IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch batch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch.baseOffset()).thenReturn(3L);
+        when(batch.lastOffset()).thenReturn(5L);
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, batch);
+
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetWithSize(5L, 0);
+
+        assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result);
+        // target is equals to the last offset of the batch, we should call 
lastOffset

Review Comment:
   is equals to  => is equal to



##########
clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java:
##########
@@ -518,6 +523,176 @@ public void testBytesLengthOfWriteTo() throws IOException 
{
         verify(channel).transferFrom(any(), anyLong(), eq((long) size - 
firstWritten));
     }
 
+    /**
+     * Test two condition
+     * 1. If the target offset equals to the base offset of the first batch

Review Comment:
   equals to => is equal to



##########
clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java:
##########
@@ -518,6 +523,176 @@ public void testBytesLengthOfWriteTo() throws IOException 
{
         verify(channel).transferFrom(any(), anyLong(), eq((long) size - 
firstWritten));
     }
 
+    /**
+     * Test two condition
+     * 1. If the target offset equals to the base offset of the first batch
+     * 2. If the target offset < the base offset of the first batch
+     */
+    @ParameterizedTest
+    @ValueSource(longs = {5, 10})
+    public void 
testSearchForOffsetWithSizeLastOffsetCallCountFirstBatchMatch(long baseOffset) 
throws IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch batch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch.baseOffset()).thenReturn(baseOffset);
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, batch);
+
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetWithSize(5L, 0);
+
+        assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result);
+        verify(batch, never()).lastOffset();
+    }
+
+    @Test
+    public void 
testSearchForOffsetWithSizeLastOffsetCallCountFirstBatchLastOffsetMatch() 
throws IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch batch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch.baseOffset()).thenReturn(3L);
+        when(batch.lastOffset()).thenReturn(5L);
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, batch);
+
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetWithSize(5L, 0);
+
+        assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch), result);
+        // target is equals to the last offset of the batch, we should call 
lastOffset
+        verify(batch, times(1)).lastOffset();
+    }
+
+    @Test
+    public void 
testSearchForOffsetWithSizeLastOffsetCallCountLastBatchLastOffsetMatch() throws 
IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch prevBatch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(prevBatch.baseOffset()).thenReturn(5L);
+        when(prevBatch.lastOffset()).thenReturn(12L);
+        FileLogInputStream.FileChannelRecordBatch currentBatch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(currentBatch.baseOffset()).thenReturn(15L);
+        when(currentBatch.lastOffset()).thenReturn(20L);
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, prevBatch, currentBatch);
+
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetWithSize(20L, 0);
+
+        assertEquals(FileRecords.LogOffsetPosition.fromBatch(currentBatch), 
result);
+        // Because the target offset is in the current batch, we should not 
call lastOffset in the previous batch
+        verify(prevBatch, never()).lastOffset();
+        verify(currentBatch, times(1)).lastOffset();
+    }
+
+    @Test
+    public void 
testSearchForOffsetWithSizeLastOffsetCallCountPrevBatchMatches() throws 
IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch prevBatch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(prevBatch.baseOffset()).thenReturn(5L);
+        when(prevBatch.lastOffset()).thenReturn(12L); // > targetOffset
+        FileLogInputStream.FileChannelRecordBatch currentBatch = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(currentBatch.baseOffset()).thenReturn(15L); // >= targetOffset
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, prevBatch, currentBatch);
+
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetWithSize(10L, 0);
+
+        assertEquals(FileRecords.LogOffsetPosition.fromBatch(prevBatch), 
result);
+        // Because the target offset is in the current batch, we should call 
lastOffset 
+        // on the previous batch
+        verify(prevBatch, times(1)).lastOffset();
+    }
+
+    @Test
+    public void 
testSearchForOffsetWithSizeLastOffsetCallCountAllBatchesSmallerLastBatchDoesntMatch()
 throws IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch batch1 = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch1.baseOffset()).thenReturn(5L);  // < targetOffset
+        FileLogInputStream.FileChannelRecordBatch batch2 = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch2.baseOffset()).thenReturn(8L);  // < targetOffset
+        when(batch2.lastOffset()).thenReturn(9L);  // < targetOffset
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, batch1, batch2);
+
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetWithSize(10L, 0);
+
+        assertNull(result);
+        // Because the target offset is exceeded by the last offset of the 
batch2, 
+        // we should call lastOffset on the batch2
+        verify(batch1, never()).lastOffset();
+        verify(batch2, times(1)).lastOffset();
+    }
+
+    /**
+     * Test two condition
+     * 1. If the target offset < the base offset of the last batch
+     * 2. If the target offset equals to the base offset of the last batch
+     */
+    @ParameterizedTest
+    @ValueSource(longs = {8, 10})
+    public void 
testSearchForOffsetWithSizeLastOffsetCallCountAllBatchesSmallerLastBatchMatches(long
 baseOffset) throws IOException {
+        File mockFile = mock(File.class);
+        FileChannel mockChannel = mock(FileChannel.class);
+        FileLogInputStream.FileChannelRecordBatch batch1 = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch1.baseOffset()).thenReturn(5L);  // < targetOffset
+        FileLogInputStream.FileChannelRecordBatch batch2 = 
mock(FileLogInputStream.FileChannelRecordBatch.class);
+        when(batch2.baseOffset()).thenReturn(baseOffset);  // < targetOffset 
or == targetOffset
+        when(batch2.lastOffset()).thenReturn(12L); // >= targetOffset
+
+        FileRecords fileRecords = Mockito.spy(new FileRecords(mockFile, 
mockChannel, 0, 100, false));
+        mockFileRecordBatches(fileRecords, batch1, batch2);
+
+        long targetOffset = 10L;
+        FileRecords.LogOffsetPosition result = 
fileRecords.searchForOffsetWithSize(targetOffset, 0);
+
+        assertEquals(FileRecords.LogOffsetPosition.fromBatch(batch2), result);
+        if (targetOffset == baseOffset) {
+            // Because the target offset is equal to the base offset of the 
batch2, we should not call
+            // lastOffset on batch2
+            verify(batch1, times(1)).lastOffset();

Review Comment:
   We could optimize this case to avoid calling `lastOffset()`, right?



##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -298,11 +298,28 @@ public int writeTo(TransferableChannel destChannel, int 
offset, int length) thro
      * @return the batch's base offset, its physical position, and its size 
(including log overhead)
      */
     public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int 
startingPosition) {
+        FileChannelRecordBatch prevBatch = null;

Review Comment:
   This is an existing issue. The name `searchForOffsetWithSize` is a bit 
confusing. How about sth like `searchForOffsetFromPosition`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to