peterxcli commented on code in PR #18096:
URL: https://github.com/apache/kafka/pull/18096#discussion_r1884354302


##########
core/src/main/java/kafka/server/share/ShareFetchUtils.java:
##########
@@ -161,6 +161,20 @@ static long offsetForLatestTimestamp(TopicIdPartition 
topicIdPartition, ReplicaM
         return timestampAndOffset.get().offset;
     }
 
+    /**
+     * The method is used to get the offset for the given timestamp for the 
topic-partition.
+     *
+     * @return The offset for the given timestamp.
+     */
+    static long offsetForTimestamp(TopicIdPartition topicIdPartition, 
ReplicaManager replicaManager, long timestampToSearch, int leaderEpoch) {
+        Option<FileRecords.TimestampAndOffset> timestampAndOffset = 
replicaManager.fetchOffsetForTimestamp(
+            topicIdPartition.topicPartition(), timestampToSearch, new 
Some<>(IsolationLevel.READ_UNCOMMITTED), Optional.of(leaderEpoch), 
true).timestampAndOffsetOpt();
+        if (timestampAndOffset.isEmpty()) {
+            throw new OffsetNotAvailableException("offset for timestamp " + 
timestampToSearch + " not found for topic partition: " + topicIdPartition);

Review Comment:
   Thank you very much for your careful review! 😁



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to