abhijeetk88 commented on code in PR #20428:
URL: https://github.com/apache/kafka/pull/20428#discussion_r2616755197
##########
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala:
##########
@@ -135,6 +135,35 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
new OffsetAndEpoch(localLogStartOffset, epoch.orElse(0))
}
+ override def fetchEarliestPendingUploadOffset(topicPartition:
TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
+ val partition = replicaManager.getPartitionOrException(topicPartition)
+ val log = partition.localLogOrException
+
+ if (!log.remoteLogEnabled())
+ return new OffsetAndEpoch(-1L, -1)
+
+ val highestRemoteOffset = log.highestOffsetInRemoteStorage()
+
+ if (highestRemoteOffset == -1L) {
+ val localLogStartOffset = fetchEarliestLocalOffset(topicPartition,
currentLeaderEpoch)
+ val logStartOffset = fetchEarliestOffset(topicPartition,
currentLeaderEpoch)
+
+ if (localLogStartOffset.offset() == logStartOffset.offset()) {
+ // No segments have been uploaded yet
+ return logStartOffset;
+ } else {
+ // Leader currently does not know about the already uploaded segments
+ return new OffsetAndEpoch(-1L, -1);
+ }
+ }
+
+ val logStartOffset = fetchEarliestOffset(topicPartition,
currentLeaderEpoch)
+ val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1,
logStartOffset.offset())
+ val epoch =
log.leaderEpochCache.epochForOffset(earliestPendingUploadOffset)
+
+ new OffsetAndEpoch(earliestPendingUploadOffset, epoch.orElse(0))
Review Comment:
I see that for `fetchEarliestLocalOffset` also (line 135), we return 0 when
epoch is empty.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]