chia7712 commented on code in PR #16873:
URL: https://github.com/apache/kafka/pull/16873#discussion_r1727662744


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1103,35 +1103,41 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val responseTopics = authorizedRequestInfo.map { topic =>
       val responsePartitions = topic.partitions.asScala.map { partition =>
-        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
-
-        try {
-          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-            topicPartition = topicPartition,
-            timestamp = partition.timestamp,
-            maxNumOffsets = partition.maxNumOffsets,
-            isFromConsumer = offsetRequest.replicaId == 
ListOffsetsRequest.CONSUMER_REPLICA_ID,
-            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
+        if (partition.timestamp() < ListOffsetsRequest.EARLIEST_TIMESTAMP) {
           new ListOffsetsPartitionResponse()
             .setPartitionIndex(partition.partitionIndex)
-            .setErrorCode(Errors.NONE.code)
-            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
-        } catch {
-          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderOrFollowerException are special cases since these error messages
-          // are typically transient and there is no value in logging the 
entire stack trace for the same
-          case e @ (_ : UnknownTopicOrPartitionException |
-                    _ : NotLeaderOrFollowerException |
-                    _ : KafkaStorageException) =>
-            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-              correlationId, clientId, topicPartition, e.getMessage))
-            new ListOffsetsPartitionResponse()
-              .setPartitionIndex(partition.partitionIndex)
-              .setErrorCode(Errors.forException(e).code)
-          case e: Throwable =>
-            error("Error while responding to offset request", e)
+            .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+        } else {
+          val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+
+          try {
+            val offsets = replicaManager.legacyFetchOffsetsForTimestamp(

Review Comment:
   In order to be a good friend to 3.9, I will file jira to trace the possible 
drastic change we discussed.
   
   **SHOULD** be included by this PR
   
   1. revert the support of EARLIEST_LOCAL_TIMESTAMP from v0 path. see comment: 
https://github.com/apache/kafka/pull/16873#discussion_r1725714630
   2. adjust version of `LIST_OFFSET`. the version must be >= 8  see code:
   
https://github.com/apache/kafka/blob/61a661ec5e627217e8b4e4c009d65ee0e0e938ba/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala#L107
   This can protect broker from sending request `EARLIEST_LOCAL_TIMESTAMP` to 
the old brokers which handle `EARLIEST_LOCAL_TIMESTAMP` as normal TS.
   
   **SHOULD NOT** be included by this PR
   1. adjust version of `FETCH`. the version=14 adds new error handle for 
`OffsetMovedToTieredStorageException` only. Hence, it is ok to send request to 
old brokers as they don't have tiered storage yet.
   2. enforce Tiered storage once at the feature version level. this will be 
traced by https://issues.apache.org/jira/browse/KAFKA-17405
   
   
   @FrankYang0529 @junrao @jolshan PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to