[ 
https://issues.apache.org/jira/browse/KAFKA-17801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895129#comment-17895129
 ] 

Kamal Chandraprakash commented on KAFKA-17801:
----------------------------------------------

[~junrao]

Thanks for reporting this issue! It looks like a bug to me. We have another 
check in 
[TransactionIndex#collectAbortedTxns|https://sourcegraph.com/github.com/apache/[email protected]/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java?L169-180]
 which filters/discards the invalid entries. The fetchOffset is the 
{{firstBatch.baseOffset}}, so it might be working as expected. But, it is good 
to fix the reported issue.


{code:java}
public TxnIndexSearchResult collectAbortedTxns(long fetchOffset, long 
upperBoundOffset) {
  ...
  if (abortedTxn.lastOffset() >= fetchOffset && abortedTxn.firstOffset() < 
upperBoundOffset)
  ...
}
{code} 

> RemoteLogManager may compute inaccurate upperBoundOffset for aborted txns
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-17801
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17801
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 3.6.0
>            Reporter: Jun Rao
>            Priority: Major
>
> In RemoteLogManager.read, we compute startPos as the following.
> {code:java}
> startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset);{code}
> This is the position returned by the offset index. The actual position for 
> the first batch being read happens in the following, but startPos is not 
> updated accordingly.
> {code:java}
> firstBatch = findFirstBatch(remoteLogInputStream, offset);{code}
> We then use the inaccurate startPos to create fetchDataInfo.
> {code:java}
> FetchDataInfo fetchDataInfo = new FetchDataInfo(
> new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), 
> startPos),
> MemoryRecords.readableRecords(buffer));{code}
> In addAbortedTransactions(), we use startPos to find the upperBoundOffset to 
> retrieve the aborted txns.
> {code:java}
> long upperBoundOffset = 
> offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
> .map(position -> position.offset).orElse(segmentMetadata.endOffset() + 
> 1);{code}
> The inaccurate startPos can lead to inaccurate upperBoundOffset, which leads 
> to inaccurate aborted txns returned to the consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to