apoorvmittal10 commented on code in PR #16274: URL: https://github.com/apache/kafka/pull/16274#discussion_r1636681441
########## core/src/main/java/kafka/server/SharePartition.java: ########## @@ -238,8 +245,77 @@ public static RecordState forId(byte id) { * @return The next fetch offset that should be fetched from the leader. */ public long nextFetchOffset() { - // TODO: Implement the logic to compute the next fetch offset. - return 0; + /* + The logic for determining the next offset to fetch data from a Share Partition hinges on a + flag called findNextFetchOffset. If this flag is set to true, then the next fetch offset + should be re-computed, otherwise the next fetch offset is Share Partition End Offset + 1. + The flag is set to true in the following cases: + 1. When some previously acquired records are acknowledged with type RELEASE. + 2. When the record lock duration expires for some acquired records. + 3. When some records are released on share session close. + The re-computation of next fetch offset is done by iterating over the cachedState and finding + the first available record. If no available record is found, then the next fetch offset is + set to Share Partition End Offset + 1 and findNextFetchOffset flag is set to false. + */ + lock.writeLock().lock(); + try { + // When none of the records in the cachedState are in the AVAILABLE state, findNextFetchOffset will be false + if (!findNextFetchOffset.get()) { + if (cachedState.isEmpty() || startOffset > cachedState.lastEntry().getValue().lastOffset) { + // 1. When cachedState is empty, endOffset is set to the next offset of the last offset removed from + // batch, which is the next offset to be fetched. + // 2. When startOffset has moved beyond the in-flight records, startOffset and endOffset point to the LSO, + // which is the next offset to be fetched. + return endOffset; Review Comment: Yeah, that's what my initial impression was. This concern only occurs when Log Start Offset (LSO) moves past the already fetched data. In that case, we would like to fetch from the last known endOffset, which is LSO, otherwise it ll be endOffset + 1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org