chia7712 commented on code in PR #16876: URL: https://github.com/apache/kafka/pull/16876#discussion_r1716674461
########## clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java: ########## @@ -95,13 +96,26 @@ public Builder setTargetTimes(List<ListOffsetsTopic> topics) { @Override public ListOffsetsRequest build(short version) { + data.topics() + .stream() + .flatMap(topic -> topic.partitions().stream()) + .forEach(partition -> checkVersion(version, partition)); + return new ListOffsetsRequest(data, version); } @Override public String toString() { return data.toString(); } + + private void checkVersion(short version, ListOffsetsPartition partition) { + long timestamp = partition.timestamp(); + if (timestamp == EARLIEST_LOCAL_TIMESTAMP && version < 8) + throw new UnsupportedVersionException("apiVersion must be >= 8 for EARLIEST_LOCAL_TIMESTAMP"); Review Comment: Could you please add the tp to the error message? that is helpful to users. ########## clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java: ########## @@ -146,4 +150,30 @@ public void testToListOffsetsTopics() { assertTrue(topic.partitions().contains(lop1)); } + @Test + public void testCheckVersion() { + testUnsupportedVersion(EARLIEST_LOCAL_TIMESTAMP, (short) 7); + testUnsupportedVersion(LATEST_TIERED_TIMESTAMP, (short) 8); + } + + private void testUnsupportedVersion(long timestamp, short version) { + List<ListOffsetsPartition> partitions = Collections.singletonList( + new ListOffsetsPartition().setPartitionIndex(0).setTimestamp(timestamp) + ); + + List<ListOffsetsTopic> topics = Collections.singletonList( + new ListOffsetsTopic() + .setName("topic") + .setPartitions(partitions) + ); + + ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder + .forConsumer(true, + IsolationLevel.READ_COMMITTED, + false, + false) + .setTargetTimes(topics); + + assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); Review Comment: Please check the error message to make sure that is expected error. ########## clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java: ########## @@ -146,4 +150,30 @@ public void testToListOffsetsTopics() { assertTrue(topic.partitions().contains(lop1)); } + @Test + public void testCheckVersion() { Review Comment: Could you please separate them into 2 test cases? that is more readable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org