kamalcph commented on code in PR #17659:
URL: https://github.com/apache/kafka/pull/17659#discussion_r1830352484


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1803,6 +1842,37 @@ Optional<RemoteLogSegmentMetadata> 
findNextSegmentMetadata(RemoteLogSegmentMetad
                 : Optional.empty();
     }
 
+    /**
+     * Returns the next segment that contains the aborted transaction entries. 
The search ensures that the returned
+     * segment offsets are greater than or equal to the given offset.
+     * @param tp The topic partition.
+     * @param offset The offset to start the search.
+     * @param fileCache The leader epoch file cache.
+     * @return The next segment that contains the transaction index.
+     * @throws RemoteStorageException If an error occurs while fetching the 
remote log segment metadata.
+     */
+    Optional<RemoteLogSegmentMetadata> 
findNextSegmentWithTxnIndex(TopicPartition tp,
+                                                                   long offset,
+                                                                   
LeaderEpochFileCache fileCache) throws RemoteStorageException {
+        Optional<RemoteLogSegmentMetadata> metadataOpt = Optional.empty();
+        if (fileCache != null) {

Review Comment:
   Please suggest, one way to simplify is to use multiple return, if it looks 
good, then I'll update: (In scala, we don't recommend multiple return 
statement: [The Point of No 
Return](https://tpolecat.github.io/2014/05/09/return.html))
   
   ```java
   Optional<RemoteLogSegmentMetadata> 
findNextSegmentWithTxnIndex(TopicPartition tp,
                                                                  long offset,
                                                                  
LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException {
       if (leaderEpochCache == null) {
           return Optional.empty();
       }
       OptionalInt initialEpochOpt = leaderEpochCache.epochForOffset(offset);
       if (initialEpochOpt.isEmpty()) {
           return Optional.empty();
       }
       int initialEpoch = initialEpochOpt.getAsInt();
       for (EpochEntry epochEntry : leaderEpochCache.epochEntries()) {
           if (epochEntry.epoch >= initialEpoch) {
               long startOffset = Math.max(epochEntry.startOffset, offset);
               Optional<RemoteLogSegmentMetadata> metadataOpt = 
fetchNextSegmentWithTxnIndex(tp, epochEntry.epoch, startOffset);
               if (metadataOpt.isPresent()) {
                   return metadataOpt;
               }
           }
       }
       return Optional.empty();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to