kamalcph commented on code in PR #17659:
URL: https://github.com/apache/kafka/pull/17659#discussion_r1830353244
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1803,6 +1842,37 @@ Optional<RemoteLogSegmentMetadata>
findNextSegmentMetadata(RemoteLogSegmentMetad
: Optional.empty();
}
+ /**
+ * Returns the next segment that contains the aborted transaction entries.
The search ensures that the returned
Review Comment:
done.
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1748,29 +1769,47 @@ private FetchDataInfo addAbortedTransactions(long
startOffset,
Optional.of(abortedTransactions.isEmpty() ?
Collections.emptyList() : new ArrayList<>(abortedTransactions)));
}
+ /**
+ * Collects the aborted transaction entries from the current and
subsequent segments until the upper bound offset.
+ * Note that the accumulated aborted transaction entries might contain
duplicates as it collects the entries across
+ * segments. We are relying on the client to discard the duplicates.
+ * @param startOffset The start offset of the fetch request.
+ * @param upperBoundOffset The upper bound offset of the fetch request.
+ * @param segmentMetadata The current segment metadata.
+ * @param accumulator The accumulator to collect the aborted transactions.
+ * @param log The unified log instance.
+ * @throws RemoteStorageException If an error occurs while fetching the
remote log segment metadata.
+ */
private void collectAbortedTransactions(long startOffset,
long upperBoundOffset,
RemoteLogSegmentMetadata
segmentMetadata,
Consumer<List<AbortedTxn>>
accumulator,
UnifiedLog log) throws
RemoteStorageException {
- // Search in remote segments first.
- Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt =
Optional.of(segmentMetadata);
- while (nextSegmentMetadataOpt.isPresent()) {
- Optional<TransactionIndex> txnIndexOpt =
nextSegmentMetadataOpt.map(metadata ->
indexCache.getIndexEntry(metadata).txnIndex());
- if (txnIndexOpt.isPresent()) {
- TxnIndexSearchResult searchResult =
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
- accumulator.accept(searchResult.abortedTransactions);
- if (searchResult.isComplete) {
- // Return immediately when the search result is complete,
it does not need to go through local log segments.
- return;
+ long startTimeMs = time.milliseconds();
+ TopicPartition tp =
segmentMetadata.topicIdPartition().topicPartition();
+ boolean isSearchComplete = false;
+ LeaderEpochFileCache fileCache =
log.leaderEpochCache().getOrElse(null);
+ Optional<RemoteLogSegmentMetadata> currentMetadataOpt =
Optional.of(segmentMetadata);
+ while (!isSearchComplete && currentMetadataOpt.isPresent()) {
+ RemoteLogSegmentMetadata currentMetadata =
currentMetadataOpt.get();
+ if (!currentMetadata.isTxnIdxEmpty()) {
+ TransactionIndex txnIndex =
indexCache.getIndexEntry(currentMetadata).txnIndex();
+ if (txnIndex != null) {
+ TxnIndexSearchResult searchResult =
txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
+ accumulator.accept(searchResult.abortedTransactions);
+ isSearchComplete = searchResult.isComplete;
}
}
-
- nextSegmentMetadataOpt =
findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache());
+ if (!isSearchComplete) {
+ currentMetadataOpt = findNextSegmentWithTxnIndex(tp,
currentMetadata.endOffset() + 1, fileCache);
Review Comment:
done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]