divijvaidya commented on code in PR #17659:
URL: https://github.com/apache/kafka/pull/17659#discussion_r1832865482
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1740,37 +1761,60 @@ private FetchDataInfo addAbortedTransactions(long
startOffset,
abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
+ long startTimeNs = time.nanoseconds();
collectAbortedTransactions(startOffset, upperBoundOffset,
segmentMetadata, accumulator, log);
+ LOGGER.debug("Time taken to collect: {} aborted transactions for {} in
{} ns", abortedTransactions.size(),
+ segmentMetadata, time.nanoseconds() - startTimeNs);
return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
fetchInfo.records,
fetchInfo.firstEntryIncomplete,
Optional.of(abortedTransactions.isEmpty() ?
Collections.emptyList() : new ArrayList<>(abortedTransactions)));
}
+ /**
+ * Collects the aborted transaction entries from the current and
subsequent segments until the upper bound offset.
+ * Note that the accumulated aborted transaction entries might contain
duplicates as it collects the entries across
+ * segments. We are relying on the client to discard the duplicates.
+ * @param startOffset The start offset of the fetch request.
+ * @param upperBoundOffset The upper bound offset of the fetch request.
+ * @param segmentMetadata The current segment metadata.
+ * @param accumulator The accumulator to collect the aborted transactions.
+ * @param log The unified log instance.
+ * @throws RemoteStorageException If an error occurs while fetching the
remote log segment metadata.
+ */
private void collectAbortedTransactions(long startOffset,
long upperBoundOffset,
RemoteLogSegmentMetadata
segmentMetadata,
Consumer<List<AbortedTxn>>
accumulator,
UnifiedLog log) throws
RemoteStorageException {
- // Search in remote segments first.
- Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt =
Optional.of(segmentMetadata);
- while (nextSegmentMetadataOpt.isPresent()) {
- Optional<TransactionIndex> txnIndexOpt =
nextSegmentMetadataOpt.map(metadata ->
indexCache.getIndexEntry(metadata).txnIndex());
+ TopicPartition tp =
segmentMetadata.topicIdPartition().topicPartition();
+ boolean isSearchComplete = false;
+ LeaderEpochFileCache leaderEpochCache =
log.leaderEpochCache().getOrElse(null);
+ Optional<RemoteLogSegmentMetadata> currentMetadataOpt =
Optional.of(segmentMetadata);
+ while (!isSearchComplete && currentMetadataOpt.isPresent()) {
+ RemoteLogSegmentMetadata currentMetadata =
currentMetadataOpt.get();
+ Optional<TransactionIndex> txnIndexOpt =
getTransactionIndex(currentMetadata);
if (txnIndexOpt.isPresent()) {
- TxnIndexSearchResult searchResult =
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+ TransactionIndex txnIndex = txnIndexOpt.get();
+ TxnIndexSearchResult searchResult =
txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
accumulator.accept(searchResult.abortedTransactions);
- if (searchResult.isComplete) {
- // Return immediately when the search result is complete,
it does not need to go through local log segments.
- return;
- }
+ isSearchComplete = searchResult.isComplete;
+ }
+ if (!isSearchComplete) {
+ currentMetadataOpt = findNextSegmentWithTxnIndex(tp,
currentMetadata.endOffset() + 1, leaderEpochCache);
}
-
- nextSegmentMetadataOpt =
findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache());
}
-
// Search in local segments
- collectAbortedTransactionInLocalSegments(startOffset,
upperBoundOffset, accumulator, log.logSegments().iterator());
+ if (!isSearchComplete) {
+ collectAbortedTransactionInLocalSegments(startOffset,
upperBoundOffset, accumulator, log.logSegments().iterator());
+ }
+ }
+
+ private Optional<TransactionIndex>
getTransactionIndex(RemoteLogSegmentMetadata currentMetadata) {
+ return !currentMetadata.isTxnIdxEmpty() ?
+ // `ofNullable` is needed for backward compatibility for old
events on which txnIdx may not be present.
Review Comment:
Got it! Yes, I was confused with the producer snapshot.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]