kamalcph commented on code in PR #17676:
URL: https://github.com/apache/kafka/pull/17676#discussion_r1831970411


##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -3484,6 +3486,75 @@ public void testTierLagResetsToZeroOnBecomingFollower() {
         assertEquals(0, 
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments());
     }
 
+    @Test
+    public void testRemoteReadFetchDataInfo() throws RemoteStorageException, 
IOException {
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, 
scheduler);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        
when(remoteLogMetadataManager.remoteLogSegmentMetadata(eq(leaderTopicIdPartition),
 anyInt(), anyLong()))
+                .thenAnswer(ans -> {
+                    long offset = ans.getArgument(2);
+                    RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid());
+                    RemoteLogSegmentMetadata segmentMetadata = 
createRemoteLogSegmentMetadata(segmentId,
+                            offset - 10, offset + 99, 1024, totalEpochEntries, 
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+                    return Optional.of(segmentMetadata);
+                });
+
+        File segmentFile = tempFile();
+        appendRecordsToFile(segmentFile, 100, 3);
+        FileInputStream fileInputStream = new FileInputStream(segmentFile);
+        
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class), 
anyInt()))
+                .thenReturn(fileInputStream);
+
+        RemoteLogManager remoteLogManager = new 
RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId, 
time,
+                tp -> Optional.of(mockLog),
+                (topicPartition, offset) -> currentLogStartOffset.set(offset),
+                brokerTopicStats, metrics) {
+            public RemoteStorageManager createRemoteStorageManager() {
+                return remoteStorageManager;
+            }
+            public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                return remoteLogMetadataManager;
+            }
+            int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+                return 0;
+            }
+        };
+        remoteLogManager.startup();
+        remoteLogManager.onLeadershipChange(
+                Collections.singleton(mockPartition(leaderTopicIdPartition)), 
Collections.emptySet(), topicIds);
+
+        long fetchOffset = 10;
+        FetchRequest.PartitionData partitionData = new 
FetchRequest.PartitionData(
+                Uuid.randomUuid(), fetchOffset, 0, 100, Optional.empty());
+        RemoteStorageFetchInfo remoteStorageFetchInfo = new 
RemoteStorageFetchInfo(
+                1048576, true, leaderTopicIdPartition.topicPartition(),
+                partitionData, FetchIsolation.HIGH_WATERMARK, false);
+        FetchDataInfo fetchDataInfo = 
remoteLogManager.read(remoteStorageFetchInfo);
+        // firstBatch baseOffset may not be equal to the fetchOffset
+        assertEquals(9, fetchDataInfo.fetchOffsetMetadata.messageOffset);
+        assertEquals(273, 
fetchDataInfo.fetchOffsetMetadata.relativePositionInSegment);
+    }
+
+    private void appendRecordsToFile(File file, int nRecords, int 
nRecordsPerBatch) throws IOException {
+        byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
+        Compression compression = Compression.NONE;
+        long offset = 0;
+        List<SimpleRecord> records = new ArrayList<>();
+        try (FileRecords fileRecords = FileRecords.open(file)) {
+            for (long counter = 1; counter < nRecords + 1; counter++) {
+                records.add(new SimpleRecord("foo".getBytes()));
+                if (counter % nRecordsPerBatch == 0) {
+                    fileRecords.append(MemoryRecords.withRecords(magic, 
offset, compression, CREATE_TIME,
+                            records.toArray(new SimpleRecord[0])));
+                    offset += records.size();
+                    records.clear();
+                }
+            }
+            fileRecords.flush();

Review Comment:
   try-with-resources closes the auto-closeable, so it is not closed 
explicitly. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to