kamalcph commented on code in PR #17659:
URL: https://github.com/apache/kafka/pull/17659#discussion_r1830352484
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1803,6 +1842,37 @@ Optional<RemoteLogSegmentMetadata>
findNextSegmentMetadata(RemoteLogSegmentMetad
: Optional.empty();
}
+ /**
+ * Returns the next segment that contains the aborted transaction entries.
The search ensures that the returned
+ * segment offsets are greater than or equal to the given offset.
+ * @param tp The topic partition.
+ * @param offset The offset to start the search.
+ * @param fileCache The leader epoch file cache.
+ * @return The next segment that contains the transaction index.
+ * @throws RemoteStorageException If an error occurs while fetching the
remote log segment metadata.
+ */
+ Optional<RemoteLogSegmentMetadata>
findNextSegmentWithTxnIndex(TopicPartition tp,
+ long offset,
+
LeaderEpochFileCache fileCache) throws RemoteStorageException {
+ Optional<RemoteLogSegmentMetadata> metadataOpt = Optional.empty();
+ if (fileCache != null) {
Review Comment:
Please suggest, one way is to simplify is to use multiple return, if it
looks good, then I'll update: (In scala, we don't recommend multiple return
statement: [The Point of No
Return](https://tpolecat.github.io/2014/05/09/return.html))
```java
Optional<RemoteLogSegmentMetadata>
findNextSegmentWithTxnIndex(TopicPartition tp,
long offset,
LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException {
if (leaderEpochCache == null) {
return Optional.empty();
}
OptionalInt initialEpochOpt = leaderEpochCache.epochForOffset(offset);
if (initialEpochOpt.isEmpty()) {
return Optional.empty();
}
int initialEpoch = initialEpochOpt.getAsInt();
for (EpochEntry epochEntry : leaderEpochCache.epochEntries()) {
if (epochEntry.epoch >= initialEpoch) {
long startOffset = Math.max(epochEntry.startOffset, offset);
Optional<RemoteLogSegmentMetadata> metadataOpt =
fetchNextSegmentWithTxnIndex(tp, epochEntry.epoch, startOffset);
if (metadataOpt.isPresent()) {
return metadataOpt;
}
}
}
return Optional.empty();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]