kamalcph commented on code in PR #17659:
URL: https://github.com/apache/kafka/pull/17659#discussion_r1832738150
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1740,37 +1761,60 @@ private FetchDataInfo addAbortedTransactions(long
startOffset,
abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
+ long startTimeNs = time.nanoseconds();
collectAbortedTransactions(startOffset, upperBoundOffset,
segmentMetadata, accumulator, log);
+ LOGGER.debug("Time taken to collect: {} aborted transactions for {} in
{} ns", abortedTransactions.size(),
+ segmentMetadata, time.nanoseconds() - startTimeNs);
return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
fetchInfo.records,
fetchInfo.firstEntryIncomplete,
Optional.of(abortedTransactions.isEmpty() ?
Collections.emptyList() : new ArrayList<>(abortedTransactions)));
}
+ /**
+ * Collects the aborted transaction entries from the current and
subsequent segments until the upper bound offset.
+ * Note that the accumulated aborted transaction entries might contain
duplicates as it collects the entries across
+ * segments. We are relying on the client to discard the duplicates.
+ * @param startOffset The start offset of the fetch request.
+ * @param upperBoundOffset The upper bound offset of the fetch request.
+ * @param segmentMetadata The current segment metadata.
+ * @param accumulator The accumulator to collect the aborted transactions.
+ * @param log The unified log instance.
+ * @throws RemoteStorageException If an error occurs while fetching the
remote log segment metadata.
+ */
private void collectAbortedTransactions(long startOffset,
long upperBoundOffset,
RemoteLogSegmentMetadata
segmentMetadata,
Consumer<List<AbortedTxn>>
accumulator,
UnifiedLog log) throws
RemoteStorageException {
- // Search in remote segments first.
- Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt =
Optional.of(segmentMetadata);
- while (nextSegmentMetadataOpt.isPresent()) {
- Optional<TransactionIndex> txnIndexOpt =
nextSegmentMetadataOpt.map(metadata ->
indexCache.getIndexEntry(metadata).txnIndex());
+ TopicPartition tp =
segmentMetadata.topicIdPartition().topicPartition();
+ boolean isSearchComplete = false;
+ LeaderEpochFileCache leaderEpochCache =
log.leaderEpochCache().getOrElse(null);
+ Optional<RemoteLogSegmentMetadata> currentMetadataOpt =
Optional.of(segmentMetadata);
+ while (!isSearchComplete && currentMetadataOpt.isPresent()) {
+ RemoteLogSegmentMetadata currentMetadata =
currentMetadataOpt.get();
+ Optional<TransactionIndex> txnIndexOpt =
getTransactionIndex(currentMetadata);
if (txnIndexOpt.isPresent()) {
- TxnIndexSearchResult searchResult =
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+ TransactionIndex txnIndex = txnIndexOpt.get();
+ TxnIndexSearchResult searchResult =
txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
accumulator.accept(searchResult.abortedTransactions);
- if (searchResult.isComplete) {
- // Return immediately when the search result is complete,
it does not need to go through local log segments.
- return;
- }
+ isSearchComplete = searchResult.isComplete;
+ }
+ if (!isSearchComplete) {
+ currentMetadataOpt = findNextSegmentWithTxnIndex(tp,
currentMetadata.endOffset() + 1, leaderEpochCache);
}
-
- nextSegmentMetadataOpt =
findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache());
}
-
// Search in local segments
- collectAbortedTransactionInLocalSegments(startOffset,
upperBoundOffset, accumulator, log.logSegments().iterator());
+ if (!isSearchComplete) {
+ collectAbortedTransactionInLocalSegments(startOffset,
upperBoundOffset, accumulator, log.logSegments().iterator());
+ }
+ }
+
+ private Optional<TransactionIndex>
getTransactionIndex(RemoteLogSegmentMetadata currentMetadata) {
+ return !currentMetadata.isTxnIdxEmpty() ?
+ // `ofNullable` is needed for backward compatibility for old
events on which txnIdx may not be present.
Review Comment:
You may be referring to the producer-snapshot file which wasn't present for
all the segments pre-2.8 build. The `txnIndex` is still optional in v3.9 when
there are no aborted txn entries.
The backward compatibility mentioned in this comment is for old events that
were stored in the `__remote_log_metadata` topic. The old events will return
the `txnIdxEmpty` as false, but the transaction index may not exist in the
remote storage. Updated the comment to clarify it.
In RemoteIndexCache, we already return [empty
byte-stream](https://sourcegraph.com/github.com/apache/kafka/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java?L425)
when txn index does not exist in remote. This check is for safety purpose if
we change the RemoteIndexCache implementation later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]