chia7712 commented on code in PR #16873:
URL: https://github.com/apache/kafka/pull/16873#discussion_r1730761499


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1103,35 +1103,41 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val responseTopics = authorizedRequestInfo.map { topic =>
       val responsePartitions = topic.partitions.asScala.map { partition =>
-        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
-
-        try {
-          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-            topicPartition = topicPartition,
-            timestamp = partition.timestamp,
-            maxNumOffsets = partition.maxNumOffsets,
-            isFromConsumer = offsetRequest.replicaId == 
ListOffsetsRequest.CONSUMER_REPLICA_ID,
-            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
+        if (partition.timestamp() < ListOffsetsRequest.EARLIEST_TIMESTAMP) {
           new ListOffsetsPartitionResponse()
             .setPartitionIndex(partition.partitionIndex)
-            .setErrorCode(Errors.NONE.code)
-            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
-        } catch {
-          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderOrFollowerException are special cases since these error messages
-          // are typically transient and there is no value in logging the 
entire stack trace for the same
-          case e @ (_ : UnknownTopicOrPartitionException |
-                    _ : NotLeaderOrFollowerException |
-                    _ : KafkaStorageException) =>
-            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-              correlationId, clientId, topicPartition, e.getMessage))
-            new ListOffsetsPartitionResponse()
-              .setPartitionIndex(partition.partitionIndex)
-              .setErrorCode(Errors.forException(e).code)
-          case e: Throwable =>
-            error("Error while responding to offset request", e)
+            .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+        } else {
+          val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+
+          try {
+            val offsets = replicaManager.legacyFetchOffsetsForTimestamp(

Review Comment:
   > +1 to add the validation to protect users in misconfiguring, it is good to 
add that validation in 3.9.
   
   @satishd thanks for confirmation. I'd like to keep discussing the validation 
for MV and remote storage.
   
   
   *zk broker*: the MV is static so we can check the MV when creating RLM.
   
   *kraft broker*: the MV is dynamic, and we all agree that RLM needs 
respective IBP (3.6). Maybe we can set the broker's min feature version of MV 
from `IBP_3_0_IV1` to `IBP_3_6_IV0` when RLM is enabled. That means the broker 
with RLM can't join the cluster which does not support the 3.6+. This changes 
also have following benefit. 
   
   1) we don't need to handle tricky MV sync ( 
https://github.com/apache/kafka/pull/16873#discussion_r1728047972)
   2) we don't need to worry the MV gets downgrade to 3.6-
   
   @junrao @satishd @jolshan @FrankYang0529 WDYT? 
   
   BTW, we have a ticker for the validation 
(https://issues.apache.org/jira/browse/KAFKA-17405), but we can reach the 
consensus here. I will copy the solution to the jira. In the mean time, 
@FrankYang0529 maybe you can address the `LIST_OFFSET`-related check in this PR 
(include this comment: 
https://github.com/apache/kafka/pull/16873#discussion_r1725714630)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to