chia7712 commented on code in PR #16873:
URL: https://github.com/apache/kafka/pull/16873#discussion_r1716499873
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1170,6 +1170,10 @@ class KafkaApis(val requestChannel: RequestChannel,
debug(s"OffsetRequest with correlation id $correlationId from client
$clientId on partition $topicPartition " +
s"failed because the partition is duplicated in the request.")
buildErrorResponse(Errors.INVALID_REQUEST, partition)
+ } else if (partition.timestamp() ==
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP && version < 8) {
+ throw new UnsupportedVersionException(s"apiVersion must be >=8 for
EARLIEST_LOCAL_TIMESTAMP")
+ } else if (partition.timestamp() ==
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP && version < 9) {
+ throw new UnsupportedVersionException(s"apiVersion must be >=9 for
LATEST_TIERED_TIMESTAMP")
Review Comment:
> A more common case is that a correctly implemented client sends a new
version of the request to an old server that doesn't understand the new
version. To cover this case, we probably could change the following method in
UnifiedLog.
It seems to me the "new client" should throw exception in building request
[0]. The other requests have similar protection [1][2][3]. This part will be
implemented by @frankvicky
> My main question is
https://github.com/apache/kafka/pull/16841#discussion_r1715730246. It's not
clear to me why there is no test failure when we set a stable MV depending on
an unstable ListOffset. It would be useful to understand the testing gap and
see if we could improve the test coverage.
the MV-related test will be addressed by
https://issues.apache.org/jira/browse/KAFKA-17336 rather than this PR. This PR
is used to return "correct" response (`unsupported version`) to client which
send old version with "new" timestamp. I have left a summary on
https://github.com/apache/kafka/pull/16841#discussion_r1715842281. In short, we
try to make sure all paths (broker-2-broker, old-client-2-new-broker,
new-client-2-old-broker) works well and get correct error.
[0]
https://github.com/apache/kafka/blob/5b9cbcf886d0666849e81d0fbb8c19d3531c1143/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java#L97
[1]
https://github.com/apache/kafka/blob/75bcb9eb42fb305bce0eecd4dda265f935cbd108/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java#L42
[2]
https://github.com/apache/kafka/blob/75bcb9eb42fb305bce0eecd4dda265f935cbd108/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java#L45
[3]
https://github.com/apache/kafka/blob/75bcb9eb42fb305bce0eecd4dda265f935cbd108/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java#L43
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]