kamalcph commented on code in PR #17676:
URL: https://github.com/apache/kafka/pull/17676#discussion_r1833653619
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1654,22 +1654,23 @@ public FetchDataInfo read(RemoteStorageFetchInfo
remoteStorageFetchInfo) throws
InputStream remoteSegInputStream = null;
try {
int startPos = 0;
- RecordBatch firstBatch = null;
-
+ EnrichedRecordBatch enrichedRecordBatch = new
EnrichedRecordBatch(null, 0);
// Iteration over multiple RemoteSegmentMetadata is required in
case of log compaction.
// It may be possible the offset is log compacted in the current
RemoteLogSegmentMetadata
// And we need to iterate over the next segment metadata to fetch
messages higher than the given offset.
- while (firstBatch == null && rlsMetadataOptional.isPresent()) {
+ while (enrichedRecordBatch.batch == null &&
rlsMetadataOptional.isPresent()) {
remoteLogSegmentMetadata = rlsMetadataOptional.get();
// Search forward for the position of the last offset that is
greater than or equal to the target offset
startPos = lookupPositionForOffset(remoteLogSegmentMetadata,
offset);
remoteSegInputStream =
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
RemoteLogInputStream remoteLogInputStream =
getRemoteLogInputStream(remoteSegInputStream);
- firstBatch = findFirstBatch(remoteLogInputStream, offset);
- if (firstBatch == null) {
+ enrichedRecordBatch = findFirstBatch(remoteLogInputStream,
offset);
+ if (enrichedRecordBatch.batch == null) {
+ Utils.closeQuietly(remoteSegInputStream,
"RemoteLogSegmentInputStream");
Review Comment:
Thanks for catching this! Updated the code to close the stream once.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]