divijvaidya commented on code in PR #17659:
URL: https://github.com/apache/kafka/pull/17659#discussion_r1827748304


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1748,29 +1769,47 @@ private FetchDataInfo addAbortedTransactions(long 
startOffset,
                 Optional.of(abortedTransactions.isEmpty() ? 
Collections.emptyList() : new ArrayList<>(abortedTransactions)));
     }
 
+    /**
+     * Collects the aborted transaction entries from the current and 
subsequent segments until the upper bound offset.
+     * Note that the accumulated aborted transaction entries might contain 
duplicates as it collects the entries across
+     * segments. We are relying on the client to discard the duplicates.
+     * @param startOffset The start offset of the fetch request.
+     * @param upperBoundOffset The upper bound offset of the fetch request.
+     * @param segmentMetadata The current segment metadata.
+     * @param accumulator The accumulator to collect the aborted transactions.
+     * @param log The unified log instance.
+     * @throws RemoteStorageException If an error occurs while fetching the 
remote log segment metadata.
+     */
     private void collectAbortedTransactions(long startOffset,
                                             long upperBoundOffset,
                                             RemoteLogSegmentMetadata 
segmentMetadata,
                                             Consumer<List<AbortedTxn>> 
accumulator,
                                             UnifiedLog log) throws 
RemoteStorageException {
-        // Search in remote segments first.
-        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = 
Optional.of(segmentMetadata);
-        while (nextSegmentMetadataOpt.isPresent()) {
-            Optional<TransactionIndex> txnIndexOpt = 
nextSegmentMetadataOpt.map(metadata -> 
indexCache.getIndexEntry(metadata).txnIndex());
-            if (txnIndexOpt.isPresent()) {
-                TxnIndexSearchResult searchResult = 
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
-                accumulator.accept(searchResult.abortedTransactions);
-                if (searchResult.isComplete) {
-                    // Return immediately when the search result is complete, 
it does not need to go through local log segments.
-                    return;
+        long startTimeMs = time.milliseconds();
+        TopicPartition tp = 
segmentMetadata.topicIdPartition().topicPartition();
+        boolean isSearchComplete = false;
+        LeaderEpochFileCache fileCache = 
log.leaderEpochCache().getOrElse(null);
+        Optional<RemoteLogSegmentMetadata> currentMetadataOpt = 
Optional.of(segmentMetadata);
+        while (!isSearchComplete && currentMetadataOpt.isPresent()) {
+            RemoteLogSegmentMetadata currentMetadata = 
currentMetadataOpt.get();
+            if (!currentMetadata.isTxnIdxEmpty()) {
+                TransactionIndex txnIndex = 
indexCache.getIndexEntry(currentMetadata).txnIndex();

Review Comment:
   (optional)
   
   may I suggest encapsulating this into
   
   ```
   Optional<TransactionIndex> getTransactionIndex(RemoteLogSegmentMetadata 
currentMetadata) {
      return !currentMetadata.isTxnIdxEmpty() ? 
   Optional.of(indexCache.getIndexEntry(currentMetadata).txnIndex()) : 
Optional.empty();
   }
   ```



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1748,29 +1769,47 @@ private FetchDataInfo addAbortedTransactions(long 
startOffset,
                 Optional.of(abortedTransactions.isEmpty() ? 
Collections.emptyList() : new ArrayList<>(abortedTransactions)));
     }
 
+    /**
+     * Collects the aborted transaction entries from the current and 
subsequent segments until the upper bound offset.
+     * Note that the accumulated aborted transaction entries might contain 
duplicates as it collects the entries across
+     * segments. We are relying on the client to discard the duplicates.
+     * @param startOffset The start offset of the fetch request.
+     * @param upperBoundOffset The upper bound offset of the fetch request.
+     * @param segmentMetadata The current segment metadata.
+     * @param accumulator The accumulator to collect the aborted transactions.
+     * @param log The unified log instance.
+     * @throws RemoteStorageException If an error occurs while fetching the 
remote log segment metadata.
+     */
     private void collectAbortedTransactions(long startOffset,
                                             long upperBoundOffset,
                                             RemoteLogSegmentMetadata 
segmentMetadata,
                                             Consumer<List<AbortedTxn>> 
accumulator,
                                             UnifiedLog log) throws 
RemoteStorageException {
-        // Search in remote segments first.
-        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = 
Optional.of(segmentMetadata);
-        while (nextSegmentMetadataOpt.isPresent()) {
-            Optional<TransactionIndex> txnIndexOpt = 
nextSegmentMetadataOpt.map(metadata -> 
indexCache.getIndexEntry(metadata).txnIndex());
-            if (txnIndexOpt.isPresent()) {
-                TxnIndexSearchResult searchResult = 
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
-                accumulator.accept(searchResult.abortedTransactions);
-                if (searchResult.isComplete) {
-                    // Return immediately when the search result is complete, 
it does not need to go through local log segments.
-                    return;
+        long startTimeMs = time.milliseconds();

Review Comment:
   This would internally call `System.currentTimeMillis` which is wall clock 
time, so it will change let's say when day light savings kicks in. Another 
alternative (very slightly expensive) is `System.nanoTime()` which is 
advertised [1] as the way to measure elapsed time such as this one. You can 
read about it here: 
https://www.baeldung.com/java-system-currenttimemillis-vs-system-nanotime
   
   Hence, could we switch to nanoTime here?
   
   [1] 
https://docs.oracle.com/javase/8/docs/api/java/lang/System.html#nanoTime--
   



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1803,6 +1842,37 @@ Optional<RemoteLogSegmentMetadata> 
findNextSegmentMetadata(RemoteLogSegmentMetad
                 : Optional.empty();
     }
 
+    /**
+     * Returns the next segment that contains the aborted transaction entries. 
The search ensures that the returned
+     * segment offsets are greater than or equal to the given offset.
+     * @param tp The topic partition.
+     * @param offset The offset to start the search.
+     * @param fileCache The leader epoch file cache.
+     * @return The next segment that contains the transaction index.
+     * @throws RemoteStorageException If an error occurs while fetching the 
remote log segment metadata.
+     */
+    Optional<RemoteLogSegmentMetadata> 
findNextSegmentWithTxnIndex(TopicPartition tp,
+                                                                   long offset,
+                                                                   
LeaderEpochFileCache fileCache) throws RemoteStorageException {
+        Optional<RemoteLogSegmentMetadata> metadataOpt = Optional.empty();
+        if (fileCache != null) {

Review Comment:
   (optional)
   
   Can we please simplify this nested if/for



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1803,6 +1842,37 @@ Optional<RemoteLogSegmentMetadata> 
findNextSegmentMetadata(RemoteLogSegmentMetad
                 : Optional.empty();
     }
 
+    /**
+     * Returns the next segment that contains the aborted transaction entries. 
The search ensures that the returned
+     * segment offsets are greater than or equal to the given offset.
+     * @param tp The topic partition.
+     * @param offset The offset to start the search.
+     * @param fileCache The leader epoch file cache.

Review Comment:
   1. please add that this could be null
   2. please add "visible for testing" here



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1748,29 +1769,47 @@ private FetchDataInfo addAbortedTransactions(long 
startOffset,
                 Optional.of(abortedTransactions.isEmpty() ? 
Collections.emptyList() : new ArrayList<>(abortedTransactions)));
     }
 
+    /**
+     * Collects the aborted transaction entries from the current and 
subsequent segments until the upper bound offset.
+     * Note that the accumulated aborted transaction entries might contain 
duplicates as it collects the entries across
+     * segments. We are relying on the client to discard the duplicates.
+     * @param startOffset The start offset of the fetch request.
+     * @param upperBoundOffset The upper bound offset of the fetch request.
+     * @param segmentMetadata The current segment metadata.
+     * @param accumulator The accumulator to collect the aborted transactions.
+     * @param log The unified log instance.
+     * @throws RemoteStorageException If an error occurs while fetching the 
remote log segment metadata.
+     */
     private void collectAbortedTransactions(long startOffset,
                                             long upperBoundOffset,
                                             RemoteLogSegmentMetadata 
segmentMetadata,
                                             Consumer<List<AbortedTxn>> 
accumulator,
                                             UnifiedLog log) throws 
RemoteStorageException {
-        // Search in remote segments first.
-        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = 
Optional.of(segmentMetadata);
-        while (nextSegmentMetadataOpt.isPresent()) {
-            Optional<TransactionIndex> txnIndexOpt = 
nextSegmentMetadataOpt.map(metadata -> 
indexCache.getIndexEntry(metadata).txnIndex());
-            if (txnIndexOpt.isPresent()) {
-                TxnIndexSearchResult searchResult = 
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
-                accumulator.accept(searchResult.abortedTransactions);
-                if (searchResult.isComplete) {
-                    // Return immediately when the search result is complete, 
it does not need to go through local log segments.
-                    return;
+        long startTimeMs = time.milliseconds();
+        TopicPartition tp = 
segmentMetadata.topicIdPartition().topicPartition();
+        boolean isSearchComplete = false;
+        LeaderEpochFileCache fileCache = 
log.leaderEpochCache().getOrElse(null);
+        Optional<RemoteLogSegmentMetadata> currentMetadataOpt = 
Optional.of(segmentMetadata);
+        while (!isSearchComplete && currentMetadataOpt.isPresent()) {
+            RemoteLogSegmentMetadata currentMetadata = 
currentMetadataOpt.get();
+            if (!currentMetadata.isTxnIdxEmpty()) {
+                TransactionIndex txnIndex = 
indexCache.getIndexEntry(currentMetadata).txnIndex();
+                if (txnIndex != null) {
+                    TxnIndexSearchResult searchResult = 
txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
+                    accumulator.accept(searchResult.abortedTransactions);
+                    isSearchComplete = searchResult.isComplete;
                 }
             }
-
-            nextSegmentMetadataOpt = 
findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache());
+            if (!isSearchComplete) {
+                currentMetadataOpt = findNextSegmentWithTxnIndex(tp, 
currentMetadata.endOffset() + 1, fileCache);

Review Comment:
   please rename fileCache to leaderEpochCache



##########
storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java:
##########
@@ -138,5 +192,6 @@ public void describe(PrintStream output) {
         output.println("  fetch-offset = " + fetchOffset);
         output.println("  expected-record-count = " + expectedTotalCount);
         output.println("  expected-record-from-tiered-storage = " + 
expectedFromSecondTierCount);
+        output.println(" remote-fetch-spec = " + remoteFetchSpec);

Review Comment:
   alignment with lines above (two leading spaces)



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1803,6 +1842,37 @@ Optional<RemoteLogSegmentMetadata> 
findNextSegmentMetadata(RemoteLogSegmentMetad
                 : Optional.empty();
     }
 
+    /**
+     * Returns the next segment that contains the aborted transaction entries. 
The search ensures that the returned

Review Comment:
   Could we add over here that the next segment may be in a higher epoch and 
this function will search beyond the epoch containing offset?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java:
##########
@@ -194,6 +194,10 @@ public void sanityCheck() {
         }
     }
 
+    public boolean isEmpty() {

Review Comment:
   Documentation please.



##########
storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java:
##########
@@ -209,4 +209,18 @@ void onPartitionLeadershipChanges(Set<TopicIdPartition> 
leaderPartitions,
      * @return Total size of the log stored in remote storage in bytes.
      */
     long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) 
throws RemoteStorageException;
+
+    /**
+     * Returns the next segment that contains the aborted txn entries for the 
given topic partition, epoch and offset.
+     * @param topicIdPartition topic partition to search for the next segment.
+     * @param epoch leader epoch of the txn index.
+     * @param offset offset
+     * @return the segment metadata that contains the txn index if exists. 
Otherwise, returns {@link Optional#empty()}.
+     * @throws RemoteStorageException if there are any storage related errors 
occurred.
+     */
+    default Optional<RemoteLogSegmentMetadata> 
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,

Review Comment:
   we need a documentation update with this one since we are adding a new API 
after TS production readiness. Could you please make change in docs as well?
   
   I will suggest in "upgrading to 4.0" section, we can add a "For implementors 
of RLMM and RLM" section and add this new breaking change there.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1748,29 +1769,47 @@ private FetchDataInfo addAbortedTransactions(long 
startOffset,
                 Optional.of(abortedTransactions.isEmpty() ? 
Collections.emptyList() : new ArrayList<>(abortedTransactions)));
     }
 
+    /**
+     * Collects the aborted transaction entries from the current and 
subsequent segments until the upper bound offset.
+     * Note that the accumulated aborted transaction entries might contain 
duplicates as it collects the entries across
+     * segments. We are relying on the client to discard the duplicates.
+     * @param startOffset The start offset of the fetch request.
+     * @param upperBoundOffset The upper bound offset of the fetch request.
+     * @param segmentMetadata The current segment metadata.
+     * @param accumulator The accumulator to collect the aborted transactions.
+     * @param log The unified log instance.
+     * @throws RemoteStorageException If an error occurs while fetching the 
remote log segment metadata.
+     */
     private void collectAbortedTransactions(long startOffset,
                                             long upperBoundOffset,
                                             RemoteLogSegmentMetadata 
segmentMetadata,
                                             Consumer<List<AbortedTxn>> 
accumulator,
                                             UnifiedLog log) throws 
RemoteStorageException {
-        // Search in remote segments first.
-        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = 
Optional.of(segmentMetadata);
-        while (nextSegmentMetadataOpt.isPresent()) {
-            Optional<TransactionIndex> txnIndexOpt = 
nextSegmentMetadataOpt.map(metadata -> 
indexCache.getIndexEntry(metadata).txnIndex());
-            if (txnIndexOpt.isPresent()) {
-                TxnIndexSearchResult searchResult = 
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
-                accumulator.accept(searchResult.abortedTransactions);
-                if (searchResult.isComplete) {
-                    // Return immediately when the search result is complete, 
it does not need to go through local log segments.
-                    return;
+        long startTimeMs = time.milliseconds();
+        TopicPartition tp = 
segmentMetadata.topicIdPartition().topicPartition();
+        boolean isSearchComplete = false;
+        LeaderEpochFileCache fileCache = 
log.leaderEpochCache().getOrElse(null);
+        Optional<RemoteLogSegmentMetadata> currentMetadataOpt = 
Optional.of(segmentMetadata);
+        while (!isSearchComplete && currentMetadataOpt.isPresent()) {
+            RemoteLogSegmentMetadata currentMetadata = 
currentMetadataOpt.get();
+            if (!currentMetadata.isTxnIdxEmpty()) {
+                TransactionIndex txnIndex = 
indexCache.getIndexEntry(currentMetadata).txnIndex();
+                if (txnIndex != null) {
+                    TxnIndexSearchResult searchResult = 
txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
+                    accumulator.accept(searchResult.abortedTransactions);
+                    isSearchComplete = searchResult.isComplete;
                 }
             }
-
-            nextSegmentMetadataOpt = 
findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache());
+            if (!isSearchComplete) {
+                currentMetadataOpt = findNextSegmentWithTxnIndex(tp, 
currentMetadata.endOffset() + 1, fileCache);
+            }
         }
-
         // Search in local segments
-        collectAbortedTransactionInLocalSegments(startOffset, 
upperBoundOffset, accumulator, log.logSegments().iterator());
+        if (!isSearchComplete) {
+            collectAbortedTransactionInLocalSegments(startOffset, 
upperBoundOffset, accumulator, log.logSegments().iterator());
+        }
+        LOGGER.debug("Total time taken to collect aborted transactions for {} 
is {} ms", segmentMetadata,

Review Comment:
   please add number of aborted transactions collected as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to