chia7712 commented on code in PR #16873:
URL: https://github.com/apache/kafka/pull/16873#discussion_r1716499873


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1170,6 +1170,10 @@ class KafkaApis(val requestChannel: RequestChannel,
           debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
               s"failed because the partition is duplicated in the request.")
           buildErrorResponse(Errors.INVALID_REQUEST, partition)
+        } else if (partition.timestamp() == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP && version < 8) {
+          throw new UnsupportedVersionException(s"apiVersion must be >=8 for 
EARLIEST_LOCAL_TIMESTAMP")
+        } else if (partition.timestamp() == 
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP && version < 9) {
+          throw new UnsupportedVersionException(s"apiVersion must be >=9 for 
LATEST_TIERED_TIMESTAMP")

Review Comment:
   > A more common case is that a correctly implemented client sends a new 
version of the request to an old server that doesn't understand the new 
version. To cover this case, we probably could change the following method in 
UnifiedLog.
   
   It seems to me the "new client" should throw exception in building request 
[0]. The other requests have similar protection [1][2][3]. This part will be 
implemented by @frankvicky 
   
   > My main question is 
https://github.com/apache/kafka/pull/16841#discussion_r1715730246. It's not 
clear to me why there is no test failure when we set a stable MV depending on 
an unstable ListOffset. It would be useful to understand the testing gap and 
see if we could improve the test coverage.
   
   the MV-related test will be addressed by 
https://issues.apache.org/jira/browse/KAFKA-17336 rather than this PR. This PR 
is used to return "correct" response (`unsupported version`) to client which 
send old version with "new" timestamp. I have left a summary on 
https://github.com/apache/kafka/pull/16841#discussion_r1715842281. In short, we 
try to make sure all paths (broker-2-broker, old-client-2-new-broker, 
new-client-2-old-broker) works well and get correct error.
   
   [0] 
https://github.com/apache/kafka/blob/5b9cbcf886d0666849e81d0fbb8c19d3531c1143/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java#L97
   [1] 
https://github.com/apache/kafka/blob/75bcb9eb42fb305bce0eecd4dda265f935cbd108/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java#L42
   [2] 
https://github.com/apache/kafka/blob/75bcb9eb42fb305bce0eecd4dda265f935cbd108/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java#L45
   [3] 
https://github.com/apache/kafka/blob/75bcb9eb42fb305bce0eecd4dda265f935cbd108/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java#L43



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to