divijvaidya commented on code in PR #17659:
URL: https://github.com/apache/kafka/pull/17659#discussion_r1830881637


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -584,13 +584,32 @@ public Optional<RemoteLogSegmentMetadata> 
fetchRemoteLogSegmentMetadata(TopicPar
                                                                             
int epochForOffset,
                                                                             
long offset) throws RemoteStorageException {
         Uuid topicId = topicIdByPartitionMap.get(topicPartition);
-
         if (topicId == null) {
             throw new KafkaException("No topic id registered for topic 
partition: " + topicPartition);
         }
         return remoteLogMetadataManager.remoteLogSegmentMetadata(new 
TopicIdPartition(topicId, topicPartition), epochForOffset, offset);
     }
 
+    /**
+     * Returns the next segment that contains the aborted transaction entries. 
The search ensures that the returned

Review Comment:
   that "may" contain the aborted



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java:
##########
@@ -125,29 +125,43 @@ public boolean isInitialized() {
      * @return the requested remote log segment metadata if it exists.
      */
     public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int 
leaderEpoch, long offset) {
-        RemoteLogLeaderEpochState remoteLogLeaderEpochState = 
leaderEpochEntries.get(leaderEpoch);
-
-        if (remoteLogLeaderEpochState == null) {
-            return Optional.empty();
+        RemoteLogSegmentMetadata metadata = getSegmentMetadata(leaderEpoch, 
offset);

Review Comment:
   For my curiosity, where are we filtering for copy_segment_finished segments?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1740,37 +1761,60 @@ private FetchDataInfo addAbortedTransactions(long 
startOffset,
                 abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
                         
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
 
+        long startTimeNs = time.nanoseconds();
         collectAbortedTransactions(startOffset, upperBoundOffset, 
segmentMetadata, accumulator, log);
+        LOGGER.debug("Time taken to collect: {} aborted transactions for {} in 
{} ns", abortedTransactions.size(),
+                segmentMetadata, time.nanoseconds() - startTimeNs);
 
         return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
                 fetchInfo.records,
                 fetchInfo.firstEntryIncomplete,
                 Optional.of(abortedTransactions.isEmpty() ? 
Collections.emptyList() : new ArrayList<>(abortedTransactions)));
     }
 
+    /**
+     * Collects the aborted transaction entries from the current and 
subsequent segments until the upper bound offset.
+     * Note that the accumulated aborted transaction entries might contain 
duplicates as it collects the entries across
+     * segments. We are relying on the client to discard the duplicates.
+     * @param startOffset The start offset of the fetch request.
+     * @param upperBoundOffset The upper bound offset of the fetch request.
+     * @param segmentMetadata The current segment metadata.
+     * @param accumulator The accumulator to collect the aborted transactions.
+     * @param log The unified log instance.
+     * @throws RemoteStorageException If an error occurs while fetching the 
remote log segment metadata.
+     */
     private void collectAbortedTransactions(long startOffset,
                                             long upperBoundOffset,
                                             RemoteLogSegmentMetadata 
segmentMetadata,
                                             Consumer<List<AbortedTxn>> 
accumulator,
                                             UnifiedLog log) throws 
RemoteStorageException {
-        // Search in remote segments first.
-        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = 
Optional.of(segmentMetadata);
-        while (nextSegmentMetadataOpt.isPresent()) {
-            Optional<TransactionIndex> txnIndexOpt = 
nextSegmentMetadataOpt.map(metadata -> 
indexCache.getIndexEntry(metadata).txnIndex());
+        TopicPartition tp = 
segmentMetadata.topicIdPartition().topicPartition();
+        boolean isSearchComplete = false;
+        LeaderEpochFileCache leaderEpochCache = 
log.leaderEpochCache().getOrElse(null);
+        Optional<RemoteLogSegmentMetadata> currentMetadataOpt = 
Optional.of(segmentMetadata);
+        while (!isSearchComplete && currentMetadataOpt.isPresent()) {
+            RemoteLogSegmentMetadata currentMetadata = 
currentMetadataOpt.get();
+            Optional<TransactionIndex> txnIndexOpt = 
getTransactionIndex(currentMetadata);
             if (txnIndexOpt.isPresent()) {
-                TxnIndexSearchResult searchResult = 
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+                TransactionIndex txnIndex = txnIndexOpt.get();
+                TxnIndexSearchResult searchResult = 
txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
                 accumulator.accept(searchResult.abortedTransactions);
-                if (searchResult.isComplete) {
-                    // Return immediately when the search result is complete, 
it does not need to go through local log segments.
-                    return;
-                }
+                isSearchComplete = searchResult.isComplete;
+            }
+            if (!isSearchComplete) {
+                currentMetadataOpt = findNextSegmentWithTxnIndex(tp, 
currentMetadata.endOffset() + 1, leaderEpochCache);
             }
-
-            nextSegmentMetadataOpt = 
findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache());
         }
-
         // Search in local segments
-        collectAbortedTransactionInLocalSegments(startOffset, 
upperBoundOffset, accumulator, log.logSegments().iterator());
+        if (!isSearchComplete) {
+            collectAbortedTransactionInLocalSegments(startOffset, 
upperBoundOffset, accumulator, log.logSegments().iterator());
+        }
+    }
+
+    private Optional<TransactionIndex> 
getTransactionIndex(RemoteLogSegmentMetadata currentMetadata) {
+        return !currentMetadata.isTxnIdxEmpty() ?
+                // `ofNullable` is needed for backward compatibility for old 
events on which txnIdx may not be present.

Review Comment:
   Good catch! Perhaps, we should add this in the documentation of `txnIndex()` 
method that a null return is expected for pre-2.8 data
   



##########
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java:
##########
@@ -125,29 +125,43 @@ public boolean isInitialized() {
      * @return the requested remote log segment metadata if it exists.
      */
     public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int 
leaderEpoch, long offset) {
-        RemoteLogLeaderEpochState remoteLogLeaderEpochState = 
leaderEpochEntries.get(leaderEpoch);
-
-        if (remoteLogLeaderEpochState == null) {
-            return Optional.empty();
+        RemoteLogSegmentMetadata metadata = getSegmentMetadata(leaderEpoch, 
offset);
+        long epochEndOffset = -1L;
+        if (metadata != null) {
+            // Check whether the given offset with leaderEpoch exists in this 
segment.
+            // Check for epoch's offset boundaries with in this segment.
+            //   1. Get the next epoch's start offset -1 if exists
+            //   2. If no next epoch exists, then segment end offset can be 
considered as epoch's relative end offset.
+            Map.Entry<Integer, Long> nextEntry = 
metadata.segmentLeaderEpochs().higherEntry(leaderEpoch);
+            epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 : 
metadata.endOffset();
         }
+        // Return empty when target offset > epoch's end offset.
+        return offset > epochEndOffset ? Optional.empty() : 
Optional.ofNullable(metadata);
+    }
 
-        // Look for floor entry as the given offset may exist in this entry.
-        RemoteLogSegmentId remoteLogSegmentId = 
remoteLogLeaderEpochState.floorEntry(offset);
-        if (remoteLogSegmentId == null) {
-            // If the offset is lower than the minimum offset available in 
metadata then return empty.
-            return Optional.empty();
+    public Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(int 
leaderEpoch, long offset) {
+        boolean txnIdxEmpty = true;
+        Optional<RemoteLogSegmentMetadata> metadataOpt = 
remoteLogSegmentMetadata(leaderEpoch, offset);
+        while (metadataOpt.isPresent() && txnIdxEmpty) {
+            txnIdxEmpty = metadataOpt.get().isTxnIdxEmpty();
+            if (txnIdxEmpty) { // If txn index is empty, then look for next 
segment.
+                metadataOpt = remoteLogSegmentMetadata(leaderEpoch, 
metadataOpt.get().endOffset() + 1);
+            }
         }
+        return txnIdxEmpty ? Optional.empty() : metadataOpt;
+    }
 
-        RemoteLogSegmentMetadata metadata = 
idToSegmentMetadata.get(remoteLogSegmentId);
-        // Check whether the given offset with leaderEpoch exists in this 
segment.
-        // Check for epoch's offset boundaries with in this segment.
-        //      1. Get the next epoch's start offset -1 if exists
-        //      2. If no next epoch exists, then segment end offset can be 
considered as epoch's relative end offset.
-        Map.Entry<Integer, Long> nextEntry = 
metadata.segmentLeaderEpochs().higherEntry(leaderEpoch);
-        long epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 : 
metadata.endOffset();
-
-        // Return empty when target offset > epoch's end offset.
-        return offset > epochEndOffset ? Optional.empty() : 
Optional.of(metadata);
+    private RemoteLogSegmentMetadata getSegmentMetadata(int leaderEpoch, long 
offset) {
+        RemoteLogLeaderEpochState remoteLogLeaderEpochState = 
leaderEpochEntries.get(leaderEpoch);
+        if (remoteLogLeaderEpochState != null) {
+            // Look for floor entry as the given offset may exist in this 
entry.
+            RemoteLogSegmentId remoteLogSegmentId = 
remoteLogLeaderEpochState.floorEntry(offset);
+            if (remoteLogSegmentId != null) {
+                return idToSegmentMetadata.get(remoteLogSegmentId);
+            }
+        }

Review Comment:
   else log warn because this is not expected that we are calling with an epoch 
which is not part of current epoch chain



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1740,37 +1761,60 @@ private FetchDataInfo addAbortedTransactions(long 
startOffset,
                 abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
                         
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
 
+        long startTimeNs = time.nanoseconds();
         collectAbortedTransactions(startOffset, upperBoundOffset, 
segmentMetadata, accumulator, log);
+        LOGGER.debug("Time taken to collect: {} aborted transactions for {} in 
{} ns", abortedTransactions.size(),

Review Comment:
   could we add topic partition Id here please? 



##########
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java:
##########
@@ -209,4 +209,18 @@ void onPartitionLeadershipChanges(Set<TopicIdPartition> 
leaderPartitions,
      * @return Total size of the log stored in remote storage in bytes.
      */
     long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) 
throws RemoteStorageException;
+
+    /**
+     * Returns the next segment that contains the aborted txn entries for the 
given topic partition, epoch and offset.
+     * @param topicIdPartition topic partition to search for the next segment.
+     * @param epoch leader epoch of the txn index.
+     * @param offset offset
+     * @return the segment metadata that contains the txn index if exists. 
Otherwise, returns {@link Optional#empty()}.
+     * @throws RemoteStorageException if there are any storage related errors 
occurred.
+     */
+    default Optional<RemoteLogSegmentMetadata> 
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,

Review Comment:
   sure, we will keep JIRA open until docs get merged in as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to