junrao commented on code in PR #17676:
URL: https://github.com/apache/kafka/pull/17676#discussion_r1831532652
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -3484,6 +3486,75 @@ public void testTierLagResetsToZeroOnBecomingFollower() {
assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments());
}
+ @Test
+ public void testRemoteReadFetchDataInfo() throws RemoteStorageException,
IOException {
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint,
scheduler);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.remoteLogSegmentMetadata(eq(leaderTopicIdPartition),
anyInt(), anyLong()))
+ .thenAnswer(ans -> {
+ long offset = ans.getArgument(2);
+ RemoteLogSegmentId segmentId = new
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid());
+ RemoteLogSegmentMetadata segmentMetadata =
createRemoteLogSegmentMetadata(segmentId,
+ offset - 10, offset + 99, 1024, totalEpochEntries,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+ return Optional.of(segmentMetadata);
+ });
+
+ File segmentFile = tempFile();
+ appendRecordsToFile(segmentFile, 100, 3);
+ FileInputStream fileInputStream = new FileInputStream(segmentFile);
+
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class),
anyInt()))
+ .thenReturn(fileInputStream);
+
+ RemoteLogManager remoteLogManager = new
RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId,
time,
+ tp -> Optional.of(mockLog),
+ (topicPartition, offset) -> currentLogStartOffset.set(offset),
+ brokerTopicStats, metrics) {
+ public RemoteStorageManager createRemoteStorageManager() {
+ return remoteStorageManager;
+ }
+ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+ return remoteLogMetadataManager;
+ }
+ int lookupPositionForOffset(RemoteLogSegmentMetadata
remoteLogSegmentMetadata, long offset) {
+ return 0;
+ }
+ };
+ remoteLogManager.startup();
+ remoteLogManager.onLeadershipChange(
+ Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.emptySet(), topicIds);
+
+ long fetchOffset = 10;
+ FetchRequest.PartitionData partitionData = new
FetchRequest.PartitionData(
+ Uuid.randomUuid(), fetchOffset, 0, 100, Optional.empty());
+ RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(
+ 1048576, true, leaderTopicIdPartition.topicPartition(),
+ partitionData, FetchIsolation.HIGH_WATERMARK, false);
+ FetchDataInfo fetchDataInfo =
remoteLogManager.read(remoteStorageFetchInfo);
+ // firstBatch baseOffset may not be equal to the fetchOffset
+ assertEquals(9, fetchDataInfo.fetchOffsetMetadata.messageOffset);
+ assertEquals(273,
fetchDataInfo.fetchOffsetMetadata.relativePositionInSegment);
+ }
+
+ private void appendRecordsToFile(File file, int nRecords, int
nRecordsPerBatch) throws IOException {
+ byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
+ Compression compression = Compression.NONE;
+ long offset = 0;
+ List<SimpleRecord> records = new ArrayList<>();
+ try (FileRecords fileRecords = FileRecords.open(file)) {
+ for (long counter = 1; counter < nRecords + 1; counter++) {
+ records.add(new SimpleRecord("foo".getBytes()));
+ if (counter % nRecordsPerBatch == 0) {
+ fileRecords.append(MemoryRecords.withRecords(magic,
offset, compression, CREATE_TIME,
+ records.toArray(new SimpleRecord[0])));
+ offset += records.size();
+ records.clear();
+ }
+ }
+ fileRecords.flush();
Review Comment:
Should we close `fileRecords`?
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -2163,4 +2167,26 @@ public String toString() {
'}';
}
}
+
+ static class EnrichedRecordBatch {
+ private final RecordBatch batch;
+ private final int skippedBytes;
+
+ public EnrichedRecordBatch() {
+ this(null, 0);
+ }
+
+ public EnrichedRecordBatch(RecordBatch batch, int skippedBytes) {
+ this.batch = batch;
+ this.skippedBytes = skippedBytes;
+ }
+
+ public RecordBatch getBatch() {
Review Comment:
We don't use getters. So `getBatch` => `batch` and `getSkippedBytes` =>
`skippedBytes`.
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -2163,4 +2167,26 @@ public String toString() {
'}';
}
}
+
+ static class EnrichedRecordBatch {
+ private final RecordBatch batch;
+ private final int skippedBytes;
+
+ public EnrichedRecordBatch() {
+ this(null, 0);
+ }
+
+ public EnrichedRecordBatch(RecordBatch batch, int skippedBytes) {
+ this.batch = batch;
+ this.skippedBytes = skippedBytes;
+ }
+
+ public RecordBatch getBatch() {
+ return batch;
+ }
+
+ public int getSkippedBytes() {
Review Comment:
This is unused.
##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -3484,6 +3486,75 @@ public void testTierLagResetsToZeroOnBecomingFollower() {
assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments());
}
+ @Test
+ public void testRemoteReadFetchDataInfo() throws RemoteStorageException,
IOException {
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint,
scheduler);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.remoteLogSegmentMetadata(eq(leaderTopicIdPartition),
anyInt(), anyLong()))
+ .thenAnswer(ans -> {
+ long offset = ans.getArgument(2);
+ RemoteLogSegmentId segmentId = new
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid());
+ RemoteLogSegmentMetadata segmentMetadata =
createRemoteLogSegmentMetadata(segmentId,
+ offset - 10, offset + 99, 1024, totalEpochEntries,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+ return Optional.of(segmentMetadata);
+ });
+
+ File segmentFile = tempFile();
+ appendRecordsToFile(segmentFile, 100, 3);
+ FileInputStream fileInputStream = new FileInputStream(segmentFile);
+
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class),
anyInt()))
+ .thenReturn(fileInputStream);
+
+ RemoteLogManager remoteLogManager = new
RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId,
time,
+ tp -> Optional.of(mockLog),
+ (topicPartition, offset) -> currentLogStartOffset.set(offset),
+ brokerTopicStats, metrics) {
+ public RemoteStorageManager createRemoteStorageManager() {
+ return remoteStorageManager;
+ }
+ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+ return remoteLogMetadataManager;
+ }
+ int lookupPositionForOffset(RemoteLogSegmentMetadata
remoteLogSegmentMetadata, long offset) {
+ return 0;
+ }
+ };
+ remoteLogManager.startup();
+ remoteLogManager.onLeadershipChange(
+ Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.emptySet(), topicIds);
+
+ long fetchOffset = 10;
+ FetchRequest.PartitionData partitionData = new
FetchRequest.PartitionData(
+ Uuid.randomUuid(), fetchOffset, 0, 100, Optional.empty());
+ RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(
+ 1048576, true, leaderTopicIdPartition.topicPartition(),
+ partitionData, FetchIsolation.HIGH_WATERMARK, false);
+ FetchDataInfo fetchDataInfo =
remoteLogManager.read(remoteStorageFetchInfo);
+ // firstBatch baseOffset may not be equal to the fetchOffset
+ assertEquals(9, fetchDataInfo.fetchOffsetMetadata.messageOffset);
+ assertEquals(273,
fetchDataInfo.fetchOffsetMetadata.relativePositionInSegment);
+ }
+
+ private void appendRecordsToFile(File file, int nRecords, int
nRecordsPerBatch) throws IOException {
+ byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
+ Compression compression = Compression.NONE;
+ long offset = 0;
+ List<SimpleRecord> records = new ArrayList<>();
+ try (FileRecords fileRecords = FileRecords.open(file)) {
+ for (long counter = 1; counter < nRecords + 1; counter++) {
+ records.add(new SimpleRecord("foo".getBytes()));
+ if (counter % nRecordsPerBatch == 0) {
+ fileRecords.append(MemoryRecords.withRecords(magic,
offset, compression, CREATE_TIME,
+ records.toArray(new SimpleRecord[0])));
Review Comment:
`new SimpleRecord[0]` forces a new array to be created any way. It's
probably simpler to just do `records.toArray()`.
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1654,22 +1654,22 @@ public FetchDataInfo read(RemoteStorageFetchInfo
remoteStorageFetchInfo) throws
InputStream remoteSegInputStream = null;
try {
int startPos = 0;
- RecordBatch firstBatch = null;
-
+ EnrichedRecordBatch enrichedRecordBatch = new
EnrichedRecordBatch();
// Iteration over multiple RemoteSegmentMetadata is required in
case of log compaction.
// It may be possible the offset is log compacted in the current
RemoteLogSegmentMetadata
// And we need to iterate over the next segment metadata to fetch
messages higher than the given offset.
- while (firstBatch == null && rlsMetadataOptional.isPresent()) {
+ while (enrichedRecordBatch.getBatch() == null &&
rlsMetadataOptional.isPresent()) {
remoteLogSegmentMetadata = rlsMetadataOptional.get();
// Search forward for the position of the last offset that is
greater than or equal to the target offset
startPos = lookupPositionForOffset(remoteLogSegmentMetadata,
offset);
remoteSegInputStream =
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
RemoteLogInputStream remoteLogInputStream =
getRemoteLogInputStream(remoteSegInputStream);
- firstBatch = findFirstBatch(remoteLogInputStream, offset);
- if (firstBatch == null) {
+ enrichedRecordBatch = findFirstBatch(remoteLogInputStream,
offset);
Review Comment:
This is an existing issue. We only close the last created
`remoteSegInputStream`. It seems that we should close each one created in the
while loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]