chia7712 commented on code in PR #16876:
URL: https://github.com/apache/kafka/pull/16876#discussion_r1716674461


##########
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java:
##########
@@ -95,13 +96,26 @@ public Builder setTargetTimes(List<ListOffsetsTopic> 
topics) {
 
         @Override
         public ListOffsetsRequest build(short version) {
+            data.topics()
+                    .stream()
+                    .flatMap(topic -> topic.partitions().stream())
+                    .forEach(partition -> checkVersion(version, partition));
+
             return new ListOffsetsRequest(data, version);
         }
 
         @Override
         public String toString() {
             return data.toString();
         }
+
+        private void checkVersion(short version, ListOffsetsPartition 
partition) {
+            long timestamp = partition.timestamp();
+            if (timestamp == EARLIEST_LOCAL_TIMESTAMP && version < 8)
+                throw new UnsupportedVersionException("apiVersion must be >= 8 
for EARLIEST_LOCAL_TIMESTAMP");

Review Comment:
   Could you please add the tp to the error message? that is helpful to users.



##########
clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java:
##########
@@ -146,4 +150,30 @@ public void testToListOffsetsTopics() {
         assertTrue(topic.partitions().contains(lop1));
     }
 
+    @Test
+    public void testCheckVersion() {
+        testUnsupportedVersion(EARLIEST_LOCAL_TIMESTAMP, (short) 7);
+        testUnsupportedVersion(LATEST_TIERED_TIMESTAMP, (short) 8);
+    }
+
+    private void testUnsupportedVersion(long timestamp, short version) {
+        List<ListOffsetsPartition> partitions = Collections.singletonList(
+                new 
ListOffsetsPartition().setPartitionIndex(0).setTimestamp(timestamp)
+        );
+
+        List<ListOffsetsTopic> topics = Collections.singletonList(
+                new ListOffsetsTopic()
+                .setName("topic")
+                .setPartitions(partitions)
+        );
+
+        ListOffsetsRequest.Builder builder = ListOffsetsRequest.Builder
+                .forConsumer(true,
+                        IsolationLevel.READ_COMMITTED,
+                        false,
+                        false)
+                .setTargetTimes(topics);
+
+        assertThrows(UnsupportedVersionException.class, () -> 
builder.build(version));

Review Comment:
   Please check the error message to make sure that is expected error.



##########
clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java:
##########
@@ -146,4 +150,30 @@ public void testToListOffsetsTopics() {
         assertTrue(topic.partitions().contains(lop1));
     }
 
+    @Test
+    public void testCheckVersion() {

Review Comment:
   Could you please separate them into 2 test cases? that is more readable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to