chia7712 commented on code in PR #16873:
URL: https://github.com/apache/kafka/pull/16873#discussion_r1728211668


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1103,35 +1103,41 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val responseTopics = authorizedRequestInfo.map { topic =>
       val responsePartitions = topic.partitions.asScala.map { partition =>
-        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
-
-        try {
-          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-            topicPartition = topicPartition,
-            timestamp = partition.timestamp,
-            maxNumOffsets = partition.maxNumOffsets,
-            isFromConsumer = offsetRequest.replicaId == 
ListOffsetsRequest.CONSUMER_REPLICA_ID,
-            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
+        if (partition.timestamp() < ListOffsetsRequest.EARLIEST_TIMESTAMP) {
           new ListOffsetsPartitionResponse()
             .setPartitionIndex(partition.partitionIndex)
-            .setErrorCode(Errors.NONE.code)
-            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
-        } catch {
-          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderOrFollowerException are special cases since these error messages
-          // are typically transient and there is no value in logging the 
entire stack trace for the same
-          case e @ (_ : UnknownTopicOrPartitionException |
-                    _ : NotLeaderOrFollowerException |
-                    _ : KafkaStorageException) =>
-            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-              correlationId, clientId, topicPartition, e.getMessage))
-            new ListOffsetsPartitionResponse()
-              .setPartitionIndex(partition.partitionIndex)
-              .setErrorCode(Errors.forException(e).code)
-          case e: Throwable =>
-            error("Error while responding to offset request", e)
+            .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+        } else {
+          val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+
+          try {
+            val offsets = replicaManager.legacyFetchOffsetsForTimestamp(

Review Comment:
   > For 3.9, I was thinking that we could change createRemoteLogManager() in 
BrokerServer to take the MV into consideration. If the MV is too low, we could 
just return None even if remote storage has been enabled. This is a bit tricky 
since MV is only available after the following statement.
   
   That is indeed a good protection, but as we discussed in another PR - the MV 
is dynamic so we still need to check the version of request sent by tiered 
storage in runtime, right?
   
   I guess the total solution (make tiered storage work with dynamic MV) is 
"feature version", but the "acceptable" fixes we can address for now are:
   
   1. disable tiered storage when MV is <= 3.5 in startup. This CAN protect 
users from using tiered storage with incorrect MV in the beginning, but it CAN 
NOT work with dynamic MV. i.e tiered storage CAN NOT be re-enabeld/re-disabled 
when MV gets upgrade/downgrade
   2. produce error when tiered storage sends request with incorrect MV. This 
CAN make "noise" to tiered storage when MV gets upgrade/downgrade, but the 
error CAN NOT be there in time. That means the error happens only if tiered 
storage use the request.
   
   > this makes sense, but want to confirm -- has this been an issue since 
tiered storage was released?
   
   The issue we discuss happens on LIST_OFFSET request, and it is used to deal 
with `OFFSET_MOVED_TO_TIERED_STORAGE`. The error is produced after 3.6 
(https://github.com/apache/kafka/commit/6f197301646135e0bb39a461ca0a07c09c3185fb#diff-78812e247ffeae6f8c49b1b22506434701b1e1bafe7f92ef8f8708059e292bf0).
 That means the leader (broker) which can return the 
`OFFSET_MOVED_TO_TIERED_STORAGE` is able to handle `EARLIEST_LOCAL_TIMESTAMP` 
too. 
   
   So my first comment may be no correct: ` those older brokers can't ever 
handle EARLIEST_LOCAL_TIMESTAMP correctly as they have no idea of 
EARLIEST_LOCAL_TIMESTAMP`.
   
   In short, we can let it go if all we want (know) to fix is the LIST_OFFSET 
request. Or we can add some protection (as above). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to